JDBC Connector

JDBC is a well-established Java database API that’s supported by every major relational (and many non-relational) database implementation, including Oracle, MySQL, PostgreSQL, Microsoft SQL Server. They provide libraries called JDBC drivers and every major database vendor will have this driver available for either download or in a package repository such as maven.

Hazelcast is able to utilize these drivers both for sources and sinks and the only step required is to add the driver to the members' classpaths.

JDBC as a Source

The JDBC source only works in batching mode, meaning the query is only executed once, for streaming changes from the database you can follow the Change Data Capture tutorial.

In the simplest form, to read from a database you simply need to pass a query:

Pipeline p = Pipeline.create();
p.readFrom(Sources.jdbc("jdbc:mysql://localhost:3306/mysql",
    "SELECT * FROM person",
    resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))
)).writeTo(Sinks.logger());

Hazelcast is also able to distribute a query across multiple members by customizing the filtering criteria for each node:

Pipeline p = Pipeline.create();
p.readFrom(Sources.jdbc(
    () -> DriverManager.getConnection("jdbc:mysql://localhost:3306/mysql"),
    (con, parallelism, index) -> {
        PreparedStatement stmt = con.prepareStatement(
              "SELECT * FROM person WHERE MOD(id, ?) = ?)");
        stmt.setInt(1, parallelism);
        stmt.setInt(2, index);
        return stmt.executeQuery();
    },
    resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))
)).writeTo(Sinks.logger());

You can also use a configured external data store as a JDBC Source:

Pipeline p = Pipeline.create();
p.readFrom(Sources.jdbc(
    externalDataStoreRef("my-database"),
    (con, parallelism, index) -> {
        PreparedStatement stmt = con.prepareStatement(
              "SELECT * FROM person WHERE MOD(id, ?) = ?)");
        stmt.setInt(1, parallelism);
        stmt.setInt(2, index);
        return stmt.executeQuery();
    },
    resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))
)).writeTo(Sinks.logger());

JDBC as a Sink

Hazelcast is also able to output the results of a job to a database using the JDBC driver by using an update query.

JDBC sink will automatically try to reconnect during database connectivity issues and is suitable for use in streaming jobs. If you want to avoid duplicate writes to the database, then a suitable insert-or-update statement should be used instead of INSERT, such as MERGE or REPLACE or INSERT .. ON CONFLICT ...

The supplied update query should be a parameterized query where the parameters are set for each item:

Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.<Person>kafka(.., "people"))
 .writeTo(Sinks.jdbc(
         "REPLACE INTO PERSON (id, name) values(?, ?)",
         DB_CONNECTION_URL,
         (stmt, item) -> {
             stmt.setInt(1, item.id);
             stmt.setString(2, item.name);
         }
));

You can also use a configured external data store as a JDBC sink:

Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.<Person>kafka(.., "people"))
 .writeTo(Sinks.jdbc(
         "REPLACE INTO PERSON (id, name) values(?, ?)",
         externalDataStoreRef(JDBC_DATA_STORE),
         (stmt, item) -> {
             stmt.setInt(1, item.id);
             stmt.setString(2, item.name);
         }
));

Fault Tolerance

The JDBC sink supports the exactly-once guarantee. It uses two-phase XA transactions, the DML statements are committed consistently with the last state snapshot. This greatly increases the latency, it is determined by the snapshot interval: messages are visible to consumers only after the commit. In order to make it work, instead of the JDBC URL you have to use the variant with Supplier<CommonDataSource> and it must return an instance of javax.sql.XADataSource, otherwise the job will not start.

Here is an example for PostgreSQL:

stage.writeTo(Sinks.jdbc("INSERT INTO " + tableName + " VALUES(?, ?)",
         () -> {
                 BaseDataSource dataSource = new PGXADataSource();
                 dataSource.setUrl("localhost:5432");
                 dataSource.setUser("user");
                 dataSource.setPassword("pwd");
                 dataSource.setDatabaseName("database1");
                 return dataSource;
         },
         (stmt, item) -> {
             stmt.setInt(1, item.getKey());
             stmt.setString(2, item.getValue());
         }
 ));
XA transactions are implemented incorrectly in some databases. Specifically a prepared transaction is sometimes rolled back when the client disconnects. The issue is tricky because the integration will work during normal operation and the problem will only manifest if the job crashes in a specific moment. Hazelcast will even not detect it, only some records will be missing from the target database. To test your broker we provide a tool, please go to XA tests to get more information. This only applies to the JDBC sink, the source doesn’t use XA transactions.