JDBC is a well-established Java database API that’s supported by every major relational (and many non-relational) database implementation, including Oracle, MySQL, PostgreSQL, Microsoft SQL Server. They provide libraries called JDBC drivers and every major database vendor will have this driver available for either download or in a package repository such as maven.
Hazelcast is able to utilize these drivers both for sources and sinks and the only step required is to add the driver to the members' classpaths.
JDBC as a Source
The JDBC source only works in batching mode, meaning the query is only executed once, for streaming changes from the database you can follow the Change Data Capture tutorial.
In the simplest form, to read from a database you simply need to pass a query:
Pipeline p = Pipeline.create();
p.readFrom(Sources.jdbc("jdbc:mysql://localhost:3306/mysql",
    "SELECT * FROM person",
    resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))
)).writeTo(Sinks.logger());Hazelcast is also able to distribute a query across multiple members by customizing the filtering criteria for each node:
Pipeline p = Pipeline.create();
p.readFrom(Sources.jdbc(
    () -> DriverManager.getConnection("jdbc:mysql://localhost:3306/mysql"),
    (con, parallelism, index) -> {
        PreparedStatement stmt = con.prepareStatement(
              "SELECT * FROM person WHERE MOD(id, ?) = ?)");
        stmt.setInt(1, parallelism);
        stmt.setInt(2, index);
        return stmt.executeQuery();
    },
    resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))
)).writeTo(Sinks.logger());You can also use a configured external data store as a JDBC Source:
Pipeline p = Pipeline.create();
p.readFrom(Sources.jdbc(
    externalDataStoreRef("my-database"),
    (con, parallelism, index) -> {
        PreparedStatement stmt = con.prepareStatement(
              "SELECT * FROM person WHERE MOD(id, ?) = ?)");
        stmt.setInt(1, parallelism);
        stmt.setInt(2, index);
        return stmt.executeQuery();
    },
    resultSet -> new Person(resultSet.getInt(1), resultSet.getString(2))
)).writeTo(Sinks.logger());JDBC as a Sink
Hazelcast is also able to output the results of a job to a database using the JDBC driver by using an update query.
JDBC sink will automatically try to reconnect during database
connectivity issues and is suitable for use in streaming jobs. If you
want to avoid duplicate writes to the database, then a suitable
insert-or-update statement should be used instead of INSERT, such as
MERGE or REPLACE or INSERT .. ON CONFLICT ...
The supplied update query should be a parameterized query where the parameters are set for each item:
Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.<Person>kafka(.., "people"))
 .writeTo(Sinks.jdbc(
         "REPLACE INTO PERSON (id, name) values(?, ?)",
         DB_CONNECTION_URL,
         (stmt, item) -> {
             stmt.setInt(1, item.id);
             stmt.setString(2, item.name);
         }
));You can also use a configured external data store as a JDBC sink:
Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.<Person>kafka(.., "people"))
 .writeTo(Sinks.jdbc(
         "REPLACE INTO PERSON (id, name) values(?, ?)",
         externalDataStoreRef(JDBC_DATA_STORE),
         (stmt, item) -> {
             stmt.setInt(1, item.id);
             stmt.setString(2, item.name);
         }
));Fault Tolerance
The JDBC sink supports the exactly-once guarantee. It uses two-phase XA
transactions, the DML statements are committed consistently with the
last state snapshot. This greatly increases the latency, it is
determined by the snapshot interval: messages are visible to consumers
only after the commit. In order to make it work, instead of the JDBC URL
you have to use the variant with Supplier<CommonDataSource> and it
must return an instance of javax.sql.XADataSource, otherwise the job
will not start.
Here is an example for PostgreSQL:
stage.writeTo(Sinks.jdbc("INSERT INTO " + tableName + " VALUES(?, ?)",
         () -> {
                 BaseDataSource dataSource = new PGXADataSource();
                 dataSource.setUrl("localhost:5432");
                 dataSource.setUser("user");
                 dataSource.setPassword("pwd");
                 dataSource.setDatabaseName("database1");
                 return dataSource;
         },
         (stmt, item) -> {
             stmt.setInt(1, item.getKey());
             stmt.setString(2, item.getValue());
         }
 ));| XA transactions are implemented incorrectly in some databases. Specifically a prepared transaction is sometimes rolled back when the client disconnects. The issue is tricky because the integration will work during normal operation and the problem will only manifest if the job crashes in a specific moment. Hazelcast will even not detect it, only some records will be missing from the target database. To test your broker we provide a tool, please go to XA tests to get more information. This only applies to the JDBC sink, the source doesn’t use XA transactions. |