Avro Handling
Nstream provides an Avro Adapter library that can be coupled with connectors to facilitate ingestion of Avro-serialized data.
Consuming Avro data from any source looks like the following:
- Source data gets consumed by…
- …an Ingress Bridge, which receives either
-
org.apache.avro.generic.GenericRecords, or - Avro binary payloads
and then…
-
- …transforms the received objects into either:
- Custom POJOs, which may be processed and relayed to downstream “business logic” Web Agents via Java code.
-
swim.structure.Values, which may be processed and relayed to downstream “business logic” Web Agents by Java code or the Relay DSL.
The utilities in this guide demonstrate the 3.2-terminating branch of flow.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-avro:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-avro</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
GenericRecord to Value
Note: this is typically the most convenient strategy, but it is limited in availability to the following environments:
- Kafka/Confluent topics that only contain messages published to via
KafkaAvroSerializer - Pulsar topics that only contain messages published to via
Schema.AVRO(...)
Use the an nstream.adapter.avro.GenericRecordAssembler instance to transform GenericRecord messages into Values.
In pure Java, this looks like:
final GenericRecordAssembler assembler = new GenericRecordAssembler();
final GenericRecord msg = ... ; // org.apache.avro.generic.GenericRecord
// Note: assembler is reusable, and assemble() is thread-safe
final Value result = assembler.assemble(msg); // swim.structure.Value
No-code (config-only) declarations are also available, though the rules are somewhat connector-dependent.
Kafka
- Within the
Provisionthat corresponds to the consumer configuration, useKafkaAvroDeserializerfor each applicable deserializer (typically onlyvalue.deserializer). - Within the
kafkaIngressSettingsconfiguration, set the applicablevalueMolderand/orkeyMolder(rare) as shown below. The toolkit may correctly infer these sometimes if omitted, but it never hurts to be explicit.
# server.recon
provisions: {
@provision("consumerConf") {
class: "nstream.adapter.common.provision.PropertiesProvision",
def: {
...
# Using this deserializer yields a KafkaIngestingPatch<$KeyType, GenericRecord>
"value.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer"
...
}
}
}
"demo-app": @fabric {
@plane(class: "swim.api.plane.AbstractPlane")
@node {
uri: "/consumer"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
...
valueMolder: @valueAssembler("nstream.adapter.avro.GenericRecordAssembler")
...
}
}
}
}
...
Confluent
The rules for Confluent are identical to those for Kafka, other than the two name changes required in any nstream.adapter.confluent setup:
-
kafka.KafkaIngestingPatch->confluent.ConfluentIngesetingPatch -
kafkaIngressConf->confluentIngressConf
Pulsar
nstream.adapter.pulsar operates slightly differently from the previous examples.
Users do not manually work with GenericRecordAssembler; instead, simply declare pulsarIngressSettings#schemaType as auto, and any GenericRecordAssembler utilization will be taken care of under the hood.
# server.recon
"provisions": {
...
}
"demo-app": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/consumer"
@agent(class: "nstream.adapter.pulsar.PulsarIngestingPatch") {
pulsarIngressConf: @pulsarIngressSettings {
...
schemaType: "auto"
...
}
}
}
}
...
Avro Binary to Value
If your use case falls outside the aforementioned environments, you will need to transform Avro binaries instead of GenericRecords, and your code (or configuration) must statically know the desired schema.
This approach creates one nstream.adapter.avro.SwimAvroAssembler instance per schema.
Avro payloads are assemble‘d against these instances to yield swim.structure.Values.
Doing this in pure Java looks like:
final String schema = "{\"type\":\"record\","
+ "\"name\":\"Complex\","
+ "\"fields\":"
+ "[{\"name\":\"id\",\"value\":\"int\"}]}";
final Value inlineConstructorArgs = // swim.structure.Value
Record.create(2) // swim.structure.Record
.attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
.slot("schema", schema);
final Value fileConstructorArgs =
Record.create(2)
.attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
.slot("schemaConfig", "/schema-location.json");
final SwimAvroAssembler assembler =
new SwimAvroAssembler(inlineConstructorArgs);
// OR: new SwimAvroAssembler(fileConstructorArgs);
final String msg = ...; // UTF-8 encoded, Avro-serialized, schema-compliant String
// Note: assembler is reusable; and
// assemble/assembleBytes/assembleString are thread-safe.
assembler.assemble(msg);
And config-only looks like:
@valueAssembler("nstream.adapter.avro.SwimAvroAssembler"){
# Pick just one of the two strategies below
schema: "{\"type\":\"record\",\"name\":\"Complex\",\"fields\":[{\"name\":\"id\",\"value\":\"int\"}]}"
schemaConfig: "/schema-location.json"
}
Kafka and Confluent
Assignable KafkaIngressSettings fields:
keyMoldervalueMolder
Compatible deserializers (specified in Provision)
- (
org.apache.avro.common.serialization.)ByteArrayDeserializer BytesDeserializerByteBufferDeserializerStringDeserializer
KafkaAvroSerializer-Published Incompatibility
Kafka libraries utilize internal magic bytes when using KafkaAvro(Des|S)erializer. Thus, a topic that was published to using KafkaAvroSerializer is not compatible with the aforementioned deserializers.
Example:
# server.recon
provisions: {
@provision("consumerConf") {
class: "nstream.adapter.common.provision.PropertiesProvision",
def: {
...
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
...
}
}
}
"demo-app": @fabric {
@plane(class: "swim.api.plane.AbstractPlane")
@node {
uri: "/consumer"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
...
valueMolder: @valueAssembler(...) {...}
...
}
}
}
}
Pulsar
Assignable PulsarIngressSettings field:
valueMolder
Compatible schemaType values (specified in PulsarIngressSettings)
bytearraybytebufferstring
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).