Druid Ingress
Overview
This guide demonstrates how to set up Apache Druid as a data source for Nstream, using the Druid SQL Avatica JDBC Driver to query and insert data into Nstream. This integration is ideal for environments requiring the streamlined ingestion of queried data from Druid into Nstream for further processing and analytics.
Prerequisites
- JDK 11+
-
Apache Druid Intallation
- Start a local run of Apache Druid
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-druid:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-druid</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
DruidIngestingAgent:An abstract Web Agent class with built-in methods relevant to a Druid SQL query/result cycle using the Avatica JDBC Driver -
DruidIngressSettings:A plain old Java object (POJO) that configures a singleDruidIngestingAgent -
DruidIngestingPatch:A concrete (but extendable)DruidIngestingAgentsubclass that collectsResultSetsinto aswim.structure.Values
Ideal DruidIngestingPatch Conditions
If:
- Your query is static
- Response bodies are converted into
Valuesduring ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to extend the DruidIngestingPatch class and override at most one method.
Consider a Druid Dataset wikiticker-2015-09-12-sampled.json.gz with the following dimensions schema presented via a JSON spec:
{
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
}
}
}
}
The following server.recon wholly configures a DruidIngestingPatch that can query this table:
# server.recon
provisions: {
@provision("druid-provision") {
class: "nstream.adapter.druid.ConnectionProvision"
def: {
"druidUrl": "jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true"
}
}
}
"druid-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/bridge/foo"
@agent(class: "nstream.adapter.druid.DruidIngestingPatch") {
druidIngressConf: @druidIngressSettings {
connectionProvisionName: "druid-provision"
firstFetchDelayMillis: 20000
query: "SELECT \"__time\", \"cityName\", \"comment\" FROM \"wikipedia\" WHERE \"cityName\" IS NOT NULL LIMIT 9"
molderSchema: @resultSetAssembler {
__time,
cityName,
comment
},
relaySchema: @foreach {
@command {
nodeUri: "/dynamic/$cityName",
laneUri: "info"
value: $comment
}
}
}
}
}
@node {
uri: "/dynamic/:id"
@agent(class: "nstream.adapter.druid.DruidIngestingPatch")
}
}
Note that you’ll need to include any driver dependencies (in this case
avatica-core) in your classpath.
Common Variations
Timing Strategy
The default timing strategy fires a Druid SQL query task with a fixed period between fires
(regardless of task duration), with the class’s Agent#didStart() callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
DruidIngestingAgent#stageReceptionto not invokeNstreamAgent#scheduleAtFixedRate - To disable the automatic task startup, override
didStart()to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of
some CommandLane.
Variable Query
If the parameters for each request are not statically known, then the very simple DruidIngressSettings POJO cannot
express the desired functionality.
The general-purpose alternative strategy is as follows:
- Within your
DruidIngestingPatchextension, store the state required to dynamically build each request in one or more class-local variables, updating these variables correctly during the Web Agent’s lifecycle - Override
DruidIngestingAgent#prepareRequestwithin your concrete class to read that state while building each request
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).