Build pipeline service using Data Connection
This tutorial builds a service that transforms a stream of items. The service uses a data connection to retrieve a connection to a relational database, and uses a table in the database to enrich a stream of numbers with a textual representation of the last digit.
Before you begin
To complete this tutorial, you need the following:
Prerequisites | Useful resources |
---|---|
Java 17 or newer |
|
Maven or Gradle |
|
Docker |
Step 1. Create and populate database
This tutorial uses Docker to run the Postgres database.
Run the following command to start Postgres:
docker run --name postgres --rm -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
Start psql
client:
docker exec -it postgres psql -U postgres
Create a table my_table
and populate it with data:
CREATE TABLE my_table(id INTEGER PRIMARY KEY, value VARCHAR(128));
INSERT INTO my_table VALUES (0, 'zero');
INSERT INTO my_table VALUES (1, 'one');
INSERT INTO my_table VALUES (2, 'two');
INSERT INTO my_table VALUES (3, 'three');
INSERT INTO my_table VALUES (4, 'four');
INSERT INTO my_table VALUES (5, 'five');
INSERT INTO my_table VALUES (6, 'six');
INSERT INTO my_table VALUES (7, 'seven');
INSERT INTO my_table VALUES (8, 'eight');
INSERT INTO my_table VALUES (9, 'nine');
Step 2. Create new java project
Create a blank Java project named pipeline-service-data-connection-example`
and copy the Gradle or Maven file into it:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>maploader-data-connection-example</artifactId>
<version>1.0-SNAPSHOT</version>
<name>maploader-data-connection-example</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.release>17</maven.compiler.release>
</properties>
<dependencies>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>6.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.24.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.24.1</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.4</version>
</dependency>
</dependencies>
</project>
Step 3. Create pipeline and job
Create a class EnrichUsingDataConnection
with main
method that will be the main job class. The main
method will do the following steps:
-
configure data connection
-
create a service and mapping function using the data connection
-
create a pipeline that
-
takes a sequence of numbers
-
maps each number using the service
-
prints the result to the logger sink
-
-
submit the job
To configure the data connection, copy the following code snippet:
public class EnrichUsingDataConnection {
public static void main(String[] args) {
HazelcastInstance hz = HazelcastBootstrap.getInstance();
DataConnectionConfig dcc = new DataConnectionConfig("my_data_connection");
dcc.setType("JDBC");
dcc.setProperty("jdbcUrl", "jdbc:postgresql://localhost:5432/postgres");
dcc.setProperty("user", "postgres");
dcc.setProperty("password", "postgres");
hz.getConfig().addDataConnectionConfig(dcc);
// ...
}
}
Use the following to create the service factory:
public class EnrichUsingDataConnection {
public static void main(String[] args) {
// ...
ServiceFactory<DataConnectionService, JdbcDataConnection> sf =
ServiceFactory.withCreateContextFn(Context::dataConnectionService)
.withCreateServiceFn(
(context, dcs) -> dcs.getAndRetainDataConnection("my_data_connection", JdbcDataConnection.class)
)
.withDestroyServiceFn(DataConnectionBase::release);
// ...
}
}
Use the following to create the mapping function:
BiFunctionEx<JdbcDataConnection, Long, Tuple2<Long, String>> mapFunction = (dc, key) -> {
try (Connection connection = dc.getConnection()) {
PreparedStatement statement = connection.prepareStatement(
"SELECT value FROM my_table WHERE id = ?");
statement.setLong(1, key % 10);
ResultSet resultSet = statement.executeQuery();
String value = null;
if (resultSet.next()) {
value = resultSet.getString("value");
}
return tuple2(key, value);
} catch (SQLException e) {
throw new RuntimeException("Failed to load value for key=" + key, e);
}
};
Now, you can create the pipeline and submit it:
public class EnrichUsingDataConnection {
public static void main(String[] args) {
// ...
Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(1))
.withoutTimestamps()
.map(SimpleEvent::sequence)
.mapUsingService(sf, mapFunction)
.writeTo(Sinks.logger());
hz.getJet().newJob(p).join();
}
}
Running the main method should produce log containing the following:
13:21:41.479 [ INFO] [c.h.j.i.c.WriteLoggerP] [127.0.0.1]:5701 [dev] [6.0.0-SNAPSHOT] [0c92-06c7-1a00-0001/loggerSink#0] (0, zero)
13:21:42.250 [ INFO] [c.h.j.i.c.WriteLoggerP] [127.0.0.1]:5701 [dev] [6.0.0-SNAPSHOT] [0c92-06c7-1a00-0001/loggerSink#0] (1, one)
13:21:43.253 [ INFO] [c.h.j.i.c.WriteLoggerP] [127.0.0.1]:5701 [dev] [6.0.0-SNAPSHOT] [0c92-06c7-1a00-0001/loggerSink#0] (2, two)
...
Next steps
You can learn how to submit the job to a running cluster by reading the Submitting Jobs page.