Build pipeline service using Data Connection

This tutorial builds a service that transforms a stream of items. The service uses a data connection to retrieve a connection to a relational database, and uses a table in the database to enrich a stream of numbers with a textual representation of the last digit.

Before you begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

Java 17 or newer

Maven or Gradle

Docker

Step 1. Create and populate database

This tutorial uses Docker to run the Postgres database.

Run the following command to start Postgres:

docker run --name postgres --rm -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres

Start psql client:

docker exec -it postgres psql -U postgres

Create a table my_table and populate it with data:

CREATE TABLE my_table(id INTEGER PRIMARY KEY, value VARCHAR(128));

INSERT INTO my_table VALUES (0, 'zero');
INSERT INTO my_table VALUES (1, 'one');
INSERT INTO my_table VALUES (2, 'two');
INSERT INTO my_table VALUES (3, 'three');
INSERT INTO my_table VALUES (4, 'four');
INSERT INTO my_table VALUES (5, 'five');
INSERT INTO my_table VALUES (6, 'six');
INSERT INTO my_table VALUES (7, 'seven');
INSERT INTO my_table VALUES (8, 'eight');
INSERT INTO my_table VALUES (9, 'nine');

Step 2. Create new java project

Create a blank Java project named pipeline-service-data-connection-example` and copy the Gradle or Maven file into it:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>maploader-data-connection-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>maploader-data-connection-example</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.release>17</maven.compiler.release>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>6.0.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.24.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j2-impl</artifactId>
            <version>2.24.1</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.7.4</version>
        </dependency>
    </dependencies>
</project>

Step 3. Create pipeline and job

Create a class EnrichUsingDataConnection with main method that will be the main job class. The main method will do the following steps:

  • configure data connection

  • create a service and mapping function using the data connection

  • create a pipeline that

    • takes a sequence of numbers

    • maps each number using the service

    • prints the result to the logger sink

  • submit the job

To configure the data connection, copy the following code snippet:

public class EnrichUsingDataConnection {

    public static void main(String[] args) {
        HazelcastInstance hz = HazelcastBootstrap.getInstance();

        DataConnectionConfig dcc = new DataConnectionConfig("my_data_connection");
        dcc.setType("JDBC");
        dcc.setProperty("jdbcUrl", "jdbc:postgresql://localhost:5432/postgres");
        dcc.setProperty("user", "postgres");
        dcc.setProperty("password", "postgres");
        hz.getConfig().addDataConnectionConfig(dcc);

        // ...
    }
}

Use the following to create the service factory:

public class EnrichUsingDataConnection {

    public static void main(String[] args) {
        // ...
        ServiceFactory<DataConnectionService, JdbcDataConnection> sf =
                ServiceFactory.withCreateContextFn(Context::dataConnectionService)
                        .withCreateServiceFn(
                                (context, dcs) -> dcs.getAndRetainDataConnection("my_data_connection", JdbcDataConnection.class)
                        )
                        .withDestroyServiceFn(DataConnectionBase::release);
        // ...
    }
}

Use the following to create the mapping function:

        BiFunctionEx<JdbcDataConnection, Long, Tuple2<Long, String>> mapFunction = (dc, key) -> {
            try (Connection connection = dc.getConnection()) {
                PreparedStatement statement = connection.prepareStatement(
                        "SELECT value FROM my_table WHERE id = ?");

                statement.setLong(1, key % 10);
                ResultSet resultSet = statement.executeQuery();
                String value = null;
                if (resultSet.next()) {
                    value = resultSet.getString("value");
                }
                return tuple2(key, value);
            } catch (SQLException e) {
                throw new RuntimeException("Failed to load value for key=" + key, e);
            }
        };

Now, you can create the pipeline and submit it:

public class EnrichUsingDataConnection {

    public static void main(String[] args) {
        // ...

        Pipeline p = Pipeline.create();
        p.readFrom(TestSources.itemStream(1))
                .withoutTimestamps()
                .map(SimpleEvent::sequence)
                .mapUsingService(sf, mapFunction)
                .writeTo(Sinks.logger());

        hz.getJet().newJob(p).join();
    }
}

Running the main method should produce log containing the following:

13:21:41.479 [ INFO] [c.h.j.i.c.WriteLoggerP] [127.0.0.1]:5701 [dev] [6.0.0-SNAPSHOT] [0c92-06c7-1a00-0001/loggerSink#0] (0, zero)
13:21:42.250 [ INFO] [c.h.j.i.c.WriteLoggerP] [127.0.0.1]:5701 [dev] [6.0.0-SNAPSHOT] [0c92-06c7-1a00-0001/loggerSink#0] (1, one)
13:21:43.253 [ INFO] [c.h.j.i.c.WriteLoggerP] [127.0.0.1]:5701 [dev] [6.0.0-SNAPSHOT] [0c92-06c7-1a00-0001/loggerSink#0] (2, two)
...

Next steps

You can learn how to submit the job to a running cluster by reading the Submitting Jobs page.