Relay DSL
Nstream provides a Relay domain-specific language (DSL) that can be configured within connector settings to enable no-code ingestion of source data.
This is an opt-in convenience feature that is designed to enable reasonably powerful customizability from configurations alone. If you prefer to work directly within your Web Agents’ Java code, you are perfectly welcome to do so instead.
Note: the content here is not connector-specific.
For an adapter library nstream.adapter.foo, each relayDef demonstrated in the following sections configures the relaySchema of a FooIngressSettings.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-common:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-common</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Overview
The Relay DSL expresses the following: given an incoming message of type swim.structure.Value, perform these declared actions.
This corresponds to the Ingress to Entities section from the Toolkit Overview.
For ingestor-type processes, the desired actions are nearly always command messages that other Web Agents will utilize.
Recall that a command message is uniquely identified by its target nodeUri, laneUri, and payload-type value.
This article demonstrates how to configure these properties in a variety of use cases.
Static Target nodeUris and laneUris
Single Target, Single Pass-Through Value
If each incoming message should be relayed to statically known WARP endpoints, then simply configure the approriate String values in the nodeUri (no defaults present, so this is mandatory) and laneUri (default=addEvent) properties.
# input
{bar:3,baz:4}
# relayDef
@command {
nodeUri: "/vehicle/foo"
}
# resulting command
nodeUri=/vehicle/foo
laneUri=addEvent
value={bar:3,baz:4}
Multiple Targets, Single Pass-Through Value
Your snippet may also be a Record that contains multiple @command blocks, each of which will execute in declared order.
# input
{bar:3,baz:4}
# relayDef
{
@command {
nodeUri: "/vehicle/foo qux"
},
@command {
nodeUri: "/site/foo"
laneUri: "addMessage"
}
}
# resulting commands (2)
(1)
nodeUri=/vehicle/foo%20qux # Note automatic URI encoding
laneUri=addEvent
value={bar:3,baz:4}
(2)
nodeUri=/site/foo
laneUri=addMessage
value={bar:3,baz:4}
value Subselection
Leaving out any @command#value declarations tells the DSL to relay the entire received message.
Providing swim.structure.Selectors indicates to relay component(s) of each received message instead.
# input
{bar:3,baz:4}
# relayDef
@command {
nodeUri: "/vehicle/foo",
value: $bar # READ AS: Selector.get(bar)
}
# resulting command
nodeUri=/vehicle/foo
laneUri=addEvent
value=3
value Computations
In fact, the value configuration accepts general swim.structure.Exprs, not just Selectors.
# input
{nested:{id:foo,bar:3,baz:4}}
# relayDef0
@command {
nodeUri: "/vehicle/foo",
value: $nested.bar * $nested.baz
}
# relayDef1 (functionally equivalent to relayDef0,
# but offers "eager truncation")
@command($nested) {
nodeUri: "/vehicle/foo",
value: $bar * $baz
}
# resulting command
nodeUri=/vehicle/foo
laneUri=addEvent
value=12
Dynamically Evaluated nodeUris and laneUris
Most use cases will require message-dependent routing logic, i.e. dynamically evaluable nodeUri and laneUri properties
(the latter is rarely non-static in practice, but the option is available).
Cleanly Delimited URIs
Most target URIs can be expressed as path components (delimited by the / character), where every component is either:
- A static String, or
- An expression that yields a String when evaluated against the input.
In such cases, continue use the nodeUri and laneUri configurations; just include Expr components within the Strings as needed.
# input
{
info:{id:"4JWH921",state:CA,year:2000}
stats:{mileage:131201,speed:45}
}
# relayDef
@command {
nodeUri: "/vehicle/$info.state/$info.id",
laneUri: "addStats",
value: $stats
}
# resulting command
nodeUri=/vehicle/CA/4JWH921
laneUri=addStats
value={mileage:131201,speed:45}
Just like with value, each URI component may be a general Expr instead of just a Selector.
# input
{
hero: {
name: "Foo/Bar", score: 22, penalty: 0
}
sidekick: {
name: "Bar/Baz", score: 21, penalty: 3
}
}
# relayDef
{
@command {
nodeUri: "/team/$hero.name"
laneUri: "addScore"
value: $hero.score - $hero.penalty
}
@command {
nodeUri: "/team/$hero.name + \"+\" + $sidekick.name"
laneUri: "addScore"
value: $hero.score - $hero.penalty + $sidekick.score - $sidekick.penalty
}
}
# resulting commands (2)
# Note that / within components are transparently encoded,
# while path component separators are left untouched
(1)
nodeUri=/team/Foo%2fBar
laneUri=addScore
value=22
(2)
nodeUri=/team/Foo%2fBar%2bBar%2fBaz
laneUri=addScore
value=40
Complex URIs
The syntax in the previous section is limited by the power of Exprs.
More complicated URIs can be constructed by using the complex...Uri identifiers, which have access to the full library of Directives within the Relay DSL.
These configurations:
- Accept singular
Records - Treat each
Itemin theRecordas a DSL snippet to be evaluated - Concatenate the evaluations in-order to form the final result.
Complex Properties Are Explicit
There are zero transparent convenience features (e.g. the implicit URI encoding demonstrated above) in this flow. Ensure that your configuration does exactly what you want, perhaps even on malformed inputs.
# input
{
"code": 200,
"payload": {
"USD/EUR": {
"rate": 0.967715,
"timestamp": 1660235884
}
}
}
# relayDef0
@command {
complexNodeUri: {
"/currency/",
@uriPathEncode($payload.*:) # READ AS Selector.get(payload).key()
}
value: $payload.:* # READ AS Selector.get(payload).value()
}
# relayDef1 (functionally equivalent to relayDef0)
@command($payload) {
complexNodeUri: {
"/currency/",
@uriPathEncode($*:) # READ AS Selector.key()
}
value: $:* # READ AS Selector.value()
}
# resulting command
nodeUri=/currency/USD%2fEUR
laneUri=addEvent
value={rate:0.967715,timestamp:1660235884}
One Message, Many Targets (@foreach)
An input may contain data for many targets whose natures themselves depend on possibly highly variable input.
A @foreach declaration that encapsulates one or more @command blocks may help to address such situations.
Everything else that has been discussed in the article remains applicable.
Basic Usage
# input
{
"code": 200,
"vehicles": {
@vehicle{id:SC021,lat:45.502948,lon:-122.6702635}
@vehicle{id:SC010,lat:45.5228468,lon:-122.6811123}
}
}
# relayDef
@foreach($vehicles) {
@command {
nodeUri: "/vehicle/$id",
laneUri: "addLocation"
}
}
# resulting commands (2, because the @foreach iterates)
(1)
nodeUri=/vehicle/SC021
laneUri=addLocation
value=@vehicle{id:SC021,lat:45.502948,lon:-122.6702635}
(2)
nodeUri=/vehicle/SC010
laneUri=addLocation
value=@vehicle{id:SC010,lat:45.5228468,lon:-122.6811123}
Note: the 200 under code is unavailable within the @foreach due to “eager truncation”, though perfectly usable outside of it.
Future nstream-adapter-common releases will introduce a means to circumvent this and similar situations.
With value Transformations
# input (JSON)
{
"code": 200,
"rates": [
"USDEUR": {"rate": 0.967715, "timestamp": 1660235884},
"USDJPY": {"rate": 132.716501, "timestamp": 1660235884}
]
}
# relayDef
@foreach($rates) {
@command {
nodeUri: "/currency/$*:" # READ AS Selector.key()
laneUri: "addEvent"
value: $:* # READ AS Selector.value()
}
}
# resulting commands (2)
(1)
nodeUri=/currency/USDEUR
laneUri=addEvent
value: {rate:0.967715,timestamp:1660235884}
(2)
nodeUri=/currency/USDJPY
laneUri=addEvent
value:{rate:132.716501,timestamp:1660235884}
With complexNodeUri
# input (JSON, same as before)
{
"code": 200,
"rates": [
"USDEUR": {"rate": 0.967715, "timestamp": 1660235884},
"USDJPY": {"rate": 132.716501, "timestamp": 1660235884}
]
}
# relayDef
@foreach($rates) {
@command {
complexNodeUri: {
"/currency/",
@substring($*:){ # READ AS Selector.key()
lower:3 # READ AS str[3:] (in Python syntax)
}
}
laneUri: "addCurrentRate"
value: $:*.rate # READ AS Selector.value().get(rate)
}
}
# resulting commands (2)
(1)
nodeUri=/currency/EUR
laneUri=addCurrentRate
value: 0.967715
(2)
nodeUri=/currency/JPY
laneUri=addCurrentRate
value: 132.716501
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).