Legacy CDC Connector

This page refers to Hazelcast’s Community Edition CDC connectors, also known as legacy CDC connectors. For more information on Enterprise Edition CDC connectors, see CDC Connector.

Change Data Capture (CDC) refers to the process of observing changes made to a database and extracting them in a form usable by other systems, for the purposes of replication, analysis and many more.

Change Data Capture is especially important to Hazelcast, because it allows for the streaming of changes from databases, which can be efficiently processed by the Jet engine.

The implementation of CDC in Hazelcast Community Edition is based on Debezium. Hazelcast offers a generic Debezium source that can handle CDC events from any database supported by Debezium. However, we’re also striving to make CDC sources first class citizens in Hazelcast, as we have done already for MySQL and PostgreSQL.

Install the CDC connector

This connector is included in the full distribution of Hazelcast Community Edition.

CDC as a source

We have the following types of CDC sources:

  • DebeziumCdcSources: a generic source for all databases supported by Debezium

  • MySqlCdcSources: a specific, first class Jet CDC source for MySQL databases (also based on Debezium, but with the additional benefits provided by Hazelcast)

  • PostgresCdcSources: a specific, first class CDC source for PostgreSQL databases (also based on Debezium, but with the additional benefits provided by Hazelcast)

To set up a streaming source of CDC data, define it using the following configuration:

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
    MySqlCdcSources.mysql("customers")
            .setDatabaseAddress("127.0.0.1")
            .setDatabasePort(3306)
            .setDatabaseUser("debezium")
            .setDatabasePassword("dbz")
            .setClusterName("dbserver1")
            .setDatabaseWhitelist("inventory")
            .setTableWhitelist("inventory.customers")
            .build())
    .withNativeTimestamps(0)
    .writeTo(Sinks.logger());

For an example of how to use CDC data, see the Capture Changes from MySQL tutorial.

Fault tolerance

CDC sources offer at least once processing guarantees. The source periodically saves the database write ahead log offset for which it has dispatched events and in case of a failure/restart it will replay all events since the last successfully saved offset.

Unfortunately, there is no guarantee that the last saved offset is still in the database changelog. Such logs are always finite and, depending on the DB configuration, can be relatively short, so if the CDC source has to replay data for a long period of inactivity, then there can be data loss. With careful management, the at least once guarantee can be practically implemented.

CDC as a sink

Change data capture is a source-side functionality in Jet, but Hazelcast also offers specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the original database table. The sinks expect to receive ChangeRecord objects and apply your custom functions to them that extract the key and the value that will be applied to the target map.

For example, a sink mapping CDC data to a Customer class and maintaining a map view of latest known email addresses per customer (identified by ID) would look like this:

Pipeline p = Pipeline.create();
p.readFrom(source)
 .withoutTimestamps()
 .writeTo(CdcSinks.map("customers",
    r -> r.key().toMap().get("id"),
    r -> r.value().toObject(Customer.class).email));

The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn’t include user code submitted as a part of the job. So in the above example it’s OK to have String email values, but we wouldn’t be able to use Customer directly.

If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our CDC Join tutorial.

Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario.