JDBC Ingress
Nstream provides a Java Database Connectivity (JDBC) Adapter library that greatly facilitates ingestion from relational databases. This guide demonstrates how to repeatedly poll databases over JDBC and process responses in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-jdbc:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-jdbc</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
JdbcIngestingAgent:An abstract Web Agent class with built-in methods relevant to an JDBC query/result cycle -
JdbcIngressSettings:A plain old Java object (POJO) that configures a singleJdbcIngestingAgent -
JdbcIngestingPatch:A concrete (but extendable)JdbcIngestingAgentsubclass that collectsResultSetsinto aswim.structure.Values
Ideal JdbcIngestingPatch Conditions
If:
- Your query is static
- Response bodies are converted into
Valuesduring ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to extend the JdbcIngestingPatch class and override at most one method.
Consider a MySQL table with the following schema:
CREATE TABLE sites (
object_id INTEGER PRIMARY KEY,
name VARCHAR(255),
latitude FLOAT,
longitude FLOAT
);
The following server.recon wholly configures a JdbcIngestingPatch that can query this table:
# server.recon
provisions: {
@provision("hikari") {
class: "nstream.adapter.jdbc.ConnectionPoolProvision"
def: {
"jdbcUrl": "jdbc:mysql://database:5432/mydb"
"dataSource.driver": "com.mysql.jdbc.Driver"
"dataSource.user": public,
"dataSource.password": password,
"dataSource.databaseName": mydb
}
}
}
"jdbc-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/bridge/foo"
@agent(class: "nstream.adapter.jdbc.JdbcIngestingPatch") {
jdbcIngressConf: @jdbcIngressSettings {
connectionPoolProvisionName: "hikari"
query: "SELECT object_id, name, latitude, longitude FROM sites"
# Transforms a ResultSet row into the structure:
# {"id": ..., "name": ..., "lat": ..., "lon":, ...}
# Note the parenthesized names, NOT the ResultSet column names,
# configure the resulting Value!
molderSchema: @resultSetAssembler {
@int(id), # first column
@string(name), # second column, etc.
@float(lat),
@float(lon)
},
relaySchema: @foreach {
@command {
nodeUri: "/site/$id"
laneUri: "info"
}
}
}
}
}
}
Driver Dependencies
Make sure to include any dependencies required by your desired drivers. In this example, we require mysql-connector-j in order to be able to configure the driver listed under the Hikari provision.
Let’s examine these components in a bit more detail.
ConnectionPoolProvision
By deferring connection management to the Hikari Connection Pool library, the JDBC adapter enables efficient and low-code ingestion capability even in environments that contain concurrent JDBC reads.
See here for further details of and the configuration options for this provision type.
JdbcIngressSettings#molderSchema
Querying a database over JDBC yields a java.sql.ResultSet, an interface that provides few details about the returned value’s structure and instead expects the end-user to choose how to interpret it.
To perform this interpretation without custom Java code, your JdbcIngressSettings must configure a molderSchema that defines the desired transformation from ResultSet to swim.structure.Value.
The overall syntax for the molderSchema field is as follows:
molderSchema: @resultSetAssembler {
@$TYPE($TARGET_NAME),
...
}
- Each provided row corresponds to the respective column of the returned query (e.g. first row corresponds to first column).
-
$TYPEmay be one ofint,float,double, or (default and fallback)string. -
$TARGET_NAMEidentifies the desiredSlotkey for the column; see the comment in the above snippet for further clarification.
Note: multiple returned rows in a ResultSet are handled transparently without additional effort.
The ingress bridge will process rows in the order in which they are returned.
JdbcIngressSettings#relaySchema
See the Relay DSL docs for a general overview.
When you configure your relaySchema, ensure that any selectors correspond to $TARGET_NAMEs in the molderSchema, which are not the column names in your database unless you explicitly configured the molderSchema this way.
In the example configuration, the $id selector correctly selects the first column of a ResultSet row only because of the molderSchema configuration.
Common Variations
Timing Strategy
The default timing strategy fires a JDBC query task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart() callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
JdbcIngestingAgent#stageReceptionto not invokeNstreamAgent#scheduleAtFixedRate - To disable the automatic task startup, override
didStart()to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane:
Variable Query
If the parameters for each request are not statically known, then the very simple JdbcIngressSettings POJO cannot express the desired functionality.
The general-purpose alternative strategy is as follows:
- Within your
JdbcIngestingPatchextension, store the state required to dynamically build each request in one or more class-local variables, updating these variables correctly during the Web Agent’s lifecycle - Override
JdbcIngestingAgent#prepareRequestwithin your concrete class to read that state while building each request
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).