Kafka Vehicle Tutorial
This tutorial walks through how to build the backend for a web application that conveys real-time city transit information using the Nstream platform.
More specifically, we demonstrate how to:
- Efficiently consume messages from a Kafka topic (though the principles can easily be generalized beyond Kafka)
- Transform those messages into useful insights
- Serve those insights using Nstream Web Agents as real-time, webpage-subscribable streams addressable by granular, sensible URIs
If this seems daunting, don’t worry! The Nstream platform makes transforming real-time data into insightful, first class citizens of the World Wide Web a very approachable task.
A standalone project containing all the discussed code can be found and cloned from here.
Nstream Library Dependencies
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-common:4.15.23'
implementation 'io.nstream:nstream-adapter-kafka:4.15.23'
implementation 'io.nstream:nstream-adapter-runtime:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-common</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-kafka</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-runtime</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Source Data
Let’s envision a situation where vehicles continuously report their state to a Kafka topic. Messages in the topic take the following structure:
- key: a unique String identifying this vehicle
- value: a JSON string that looks like:
{
"id": (string (same as key)),
"timestamp": (number (Unix timestamp))
"latitude": (number),
"longitude": (number),
"speed": (number),
"bearing": (number)
... (possibly more fields)
}
Note: If you run the broker and populator using these instructions, your machine will have an equivalent topic available to any local Kafka consumer via bootstrap server localhost:29092 for experimentation purposes.
Nstream Entities
The Swim server we build will:
- Consume data from the Kafka topic
- Process and collect the data into vehicle-granularly addressable endpoints
- Perform dynamic, addressable groupings against these vehicle-corresponding endpoints
The runtime for each one of these operations will simply be a specialized Web Agent.
“VehiclesIngestingAgent”
Per the Kafka Ingress guide, we can use a KafkaIngestingPatch instance to consume Kafka-hosted messages.
Simple-to-process source data and zero need for tricky consumer lifecycle management mean that we can implement this in the server.recon configuration alone, without requiring a concrete VehiclesIngestingAgent class:
# server.recon
provisions: {
@provision("consumer-properties") {
class: "nstream.adapter.common.provision.PropertiesProvision",
def: {
"bootstrap.servers": "localhost:29092",
"group.id": "bespoke-group",
"key.deserializer": "org.apache.kafka.common.serialization.IntegerDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
"auto.offset.reset": "latest"
}
}
}
"starter": @fabric { # Point 2
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/kafka" # singleton instance, note lack of dynamic components
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
consumerPropertiesProvisionName: "consumer-properties"
topics: {"vehicles-integer-json"}
valueContentTypeOverride: "json"
# Let's discuss VehicleAgent before implementing this
relaySchema: ???
}
}
}
}
VehicleAgent
Upon consuming data, the Swim server must process and collect it into addressable endpoints.
Because there is so much flexibility in how this may play out (just like with designing any API), a good way to focus the scope is to start by asking:
- “What questions do we want real-time answers to?”
To this, you may answer:
- The latest state of specified vehicles
- A historical time-series of specified vehicles
It then becomes clear that the entities that we wish to track are vehicles, and that the desired vehicle-specific properties are both current and historical states.
This maps nicely to a design involving one VehicleAgent per vehicle that contains a latest ValueLane and a history MapLane:
// VehicleAgent.java
import swim.api.SwimLane;
import swim.api.agent.AbstractAgent;
import swim.api.lane.CommandLane;
import swim.api.lane.MapLane;
import swim.api.lane.ValueLane;
import swim.structure.Value;
public class VehicleAgent extends AbstractAgent {
@SwimLane("latest")
ValueLane<Value> latest = this.<Value>valueLane();
@SwimLane("history")
MapLane<Long, Value> history = this.<Long, Value>mapLane();
// Required so each VehicleAgent has a way to receive data
@SwimLane("addEvent")
CommandLane<Value> addEvent = this.<Value>commandLane()
.onCommand(v -> {
this.latest.set(v);
this.history.put(v.get("timestamp").longValue(), v);
});
}
Tying this into server.recon yields:
# server.recon
provisions: { ... } # from before
"starter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node { uri: "/kafka", ... } # from before
@node {
# Multiple instantiations possible (note dynamic component)
pattern: "/vehicle/:id"
@agent(class: "nstream.starter.VehicleAgent")
}
}
Revisiting “VehiclesIngestingAgent“
Since we’ve established the existence of distributed VehicleAgents, we should now update our KafkaIngestingPatch to relay messages to them:
# server.recon
...
relaySchema: @command {
nodeUri: "/vehicle/$key" # or "/vehicle/$value.id"
laneUri: "addEvent"
value: $value
}
...
Progress Check
If you’ve made it this far, take a moment to congratulate yourself! We’ve built a server that serves granular, real-time streams of the historical states of every vehicle present within some Kafka topic. This is already an impressive application on its own; the remainder of this article just exercises additional functionalities.
To verify our progress, you may either:
- Open
localhost:9001in your browser for a general purpose (i.e. not repository-specific) meta view of your Web Agents in action - Run the first two
swim-clicommands in this article’s final section to exercise code-specific streaming APIs (the last command will only work once we implement everything in the “PolarityAgent“ section).
“PolarityAgent” and PolarityMemberAgent
Nstream also offers valuable real-time interactions across Web Agents on top of the benefits that we’ve already seen.
Once such interaction is dynamically assigning categories to certain Web Agents based on some of their properties.
Because each vehicle’s status comes with GPS coordinates, let’s utilize agent grouping to categorize every VehicleAgent as “north” or “south” of lat=34.0 based on their latest update.
Per the linked documentation, this requires two additional Web Agent types:
- Two
GroupPatchinstances (one for “north”, one for “south”) - A
MemberPatchinstance for everyVehicleAgent
The first can be trivially configured in server.recon alone:
# server.recon
...
"starter": @fabric {
...
@node {
uri: "/lat34/north"
@agent(class: "nstream.adapter.common.patches.GroupPatch")
}
@node {
uri: "/lat34/south"
@agent(class: "nstream.adapter.common.patches.GroupPatch")
}
}
The second requires a bit of custom Java logic to transform a coordinate into a polarity, but this isn’t very difficult either:
import nstream.adapter.common.patches.MemberPatch;
import swim.structure.Value;
public class PolarityMemberAgent extends MemberPatch {
@Override
protected String extractGroupFromEvent(Value event) {
final float indicator = event.get("latitude").floatValue();
return indicator < 34.f ? "south" : "north";
}
}
And neither is its remaining server.recon configuration:
# server.recon
...
"starter": @fabric {
...
@node {
pattern: "/vehicle/:id"
@agent(class: "nstream.starter.VehicleAgent")
# Note "mixin"-style, additive Web Agent functionality,
@agent(class: "nstream.starter.PolarityMemberAgent") {
"groupUriPattern": "/lat34/:group"
}
}
...
}
Final Notes
After running at least the main server component from the standalone project instructions, you may open localhost:9001 in a browser window for a meta view of the web agents.
Congratulations on building the backend for an end-to-end streaming application!
To experiment with code-specific streaming APIs, you may run the following swim-cli commands while the process runs (any number in the first column of src/main/resources/locations.csv is a valid $VID; $POLARITY is either north or south):
- Subscribing to a
VehicleAgentinstance’slatestlane% swim-cli sync -h warp://localhost:9001 -n /vehicle/$VID -l latest {"id":9542,"routeId":207,"dir":"inbound","latitude":34.01912,"longitude":-118.30902,"speed":6,"bearing":"W","routeName":"207 Hollywood - Crenshaw Sta Via Western","timestamp":1695084501247} - Subscribing to a
VehicleAgentinstance’shistorylane% swim-cli sync -h warp://localhost:9001 -n /vehicle/$VID -l history @update(key:1695084501247){"id":9542,"routeId":207,"dir":"inbound","latitude":34.01912,"longitude":-118.30902,"speed":6,"bearing":"W","routeName":"207 Hollywood - Crenshaw Sta Via Western","timestamp":1695084501247} @update(key:1695084628525){"id":9542,"routeId":207,"dir":"inbound","latitude":34.01912,"longitude":-118.30902,"speed":6,"bearing":"W","routeName":"207 Hollywood - Crenshaw Sta Via Western","timestamp":1695084628525} ... - Subscribing to a “
PolarityAgent” (really aGroupPatch) instance’sagentslane% swim-cli sync -h warp://localhost:9001 -n /lat34/$POLARITY -l agents @update(key:"/vehicle/1564"){id:1564,routeId:90,dir:outbound,latitude:34.056484,longitude:-118.25013,speed:9,bearing:NW,routeName:"90 Dtla - Noho Sta Via Vineland-Foothil",timestamp:1695085068135} @update(key:"/vehicle/1570"){id:1570,routeId:78,dir:outbound,latitude:34.03334,longitude:-118.26485,speed:0,bearing:NE,routeName:"78 Downtown LA - South Arcadia Via Main",timestamp:1695085082336} ...
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).