This is a prerelease version.

View latest

CDC Connector

This page refers to Hazelcast’s Enterprise Edition CDC connectors. For more information on Community Edition CDC connectors, see Legacy CDC Connector.

Change Data Capture (CDC) refers to the process of observing changes made to a database and extracting them in a form usable by other systems, for the purposes of replication, analysis and many more.

Change Data Capture is especially important to Hazelcast, because it allows for the streaming of changes from databases, which can be efficiently processed by the Jet engine.

The implementation of CDC in Hazelcast Enterprise Edition is based on Debezium 2.x. Hazelcast offers a generic Debezium source which can handle CDC events from any database supported by Debezium, However, we’re also striving to make CDC sources first class citizens in Hazelcast, as we have done already for MySQL and PostgreSQL.

Install the CDC connector

This connector is included in the full distribution of Hazelcast Enterprise Edition.

Maven

To use this connector in a Maven project, add the following entries to the <dependency> section of your pom.xml file:

Generic connector:

<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-enterprise-cdc-debezium</artifactId>
    <version>{full-version}</version>
</dependency>

MySQL-specific connector:

<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-enterprise-cdc-mysql</artifactId>
    <version>{full-version}</version>
</dependency>
Due to licensing, MySQL connector does not include the MySQL driver as a dependency. You have to manually add the com.mysql:mysql-connector-j dependency to the classpath.

PostgreSQL-specific connector:

<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-enterprise-cdc-postgres</artifactId>
    <version>{full-version}</version>
</dependency>

CDC as a source

The Java API supports the following types of CDC source:

  • DebeziumCdcSources: a generic source for all databases supported by Debezium

  • MySqlCdcSources: a specific, first class Jet CDC source for MySQL databases (also based on Debezium, but with the additional benefits provided by Hazelcast)

  • PostgresCdcSources: a specific, first class CDC source for PostgreSQL databases (also based on Debezium, but with the additional benefits provided by Hazelcast)

To set up a CDC data streaming source, define it using the following configuration:

  • MySQL

  • PostgreSQL

  • MongoDB

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
    MySqlCdcSources.mysql("customers")
            .setDatabaseAddress("127.0.0.1", 3306)
            .setDatabaseCredentials("debezium", "dbz")
            .setClusterName("dbserver1")
            .setDatabaseIncludeList("inventory")
            .setTableIncludeList("inventory.customers")
            .build())
    .withNativeTimestamps(0)
    .writeTo(Sinks.logger());
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
    PostgresCdcSources.postgres("customers")
            .setDatabaseAddress("127.0.0.1", 5432)
            .setDatabaseCredentials("debezium", "dbz")
            .setClusterName("dbserver1")
            .setDatabaseIncludeList("inventory")
            .setTableIncludeList("inventory.customers")
            .build())
    .withNativeTimestamps(0)
    .writeTo(Sinks.logger());
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
    DebeziumCdcSources.debezium("customers", MongoDbConnector.class)
            .setProperty("mongodb.connection.string", "mongodb://localhost:27017")
            .setDatabaseIncludeList("inventory")
            .setProperty("collection.include.list", "customers")
            .build())
    .withNativeTimestamps(0)
    .writeTo(Sinks.logger());

MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the Debezium documentation for the information about required or mutually exclusive fields.

Follow the provided Capture Changes from MySQL tutorial to see how CDC processes change events from a MySQL database.

Remember to ensure your database is up and running before a CDC job is started, including any additional required CDC agents (as required by DB2), for example.

Common source builder functions

Method name Description

changeRecord()

Sets output type to ChangeRecord - a wrapper, which provides most of the fields in a strongly-typed manner.

json()

Sets output type to JSON - in the result stage, the type will be set to Map<String, String>, where the map entry’s key is the key of SourceRecord in JSON format, and the value is the whole `SourceRecord’s value in JSON format.

customMapping(RecordMappingFunction<T>)

Sets the output type to an arbitrary user type, T. Mapping from SourceRecord to T is done using the function provided by the connector.

withDefaultEngine()

Sets the preferred engine to the default (non-async) one. This engine is single-threaded, but also more widely used and tested. Use this engine for the most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only.

withAsyncEngine()

Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results.

setProperty(String, String)

Sets connector property to given value. There are multiple overloads, allowing to set the value to long, String or boolean.

Fault tolerance

CDC sources offer at least-once processing guarantees. The source periodically saves the database write ahead log offset for which it had dispatched events and in case of a failure/restart it will replay all events since the last successfully saved offset.

Unfortunately, however, there is no guarantee that the last saved offset is still in the database changelog. Such logs are always finite and depending on the DB configuration can be relatively short, so if the CDC source has to replay data for a long period of inactivity, then there can be a data loss. With careful management though we can say that at-least once guarantee can practically be provided.

CDC as a Sink

Change data capture is a source-side functionality in Jet, but we also offer some specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the original database table. The sinks expect to receive ChangeRecord objects and apply your custom functions to them that extract the key and the value that will be applied to the target map.

For example, a sink mapping CDC data to a Customer class and maintaining a map view of latest known email addresses per customer (identified by ID) would look like this:

Pipeline p = Pipeline.create();
p.readFrom(source)
 .withoutTimestamps()
 .writeTo(CdcSinks.map("customers",
    r -> r.key().toMap().get("id"),
    r -> r.value().toObject(Customer.class).email));

The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn’t include user code submitted as a part of the job. So in the above example it’s OK to have String email values, but we wouldn’t be able to use Customer directly.

If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our CDC Join tutorial.

Although User Code Deployment has been deprecated, the replacement User Code Namespaces feature does not yet support Jet jobs or pipelines. For now, continue to use the User Code Deployment solution in this scenario.

Data types

Hazelcast relies on Debezium, which in turn uses the Kafka Connect API, including Struct objects for example. Hazelcast makes conversion to Map and POJO`s easier by providing abstractions such as `RecordPart. Despite this, it’s worth knowing how some database types can or will be mapped to Java types.

Each database type has its own database type-to-struct type mappings. For specific mappings of this type, see the Debezium documentation, for example: MySQL, PostgreSQL, DB2, etc..

Common datatypes mapping.

Struct type Semantic type Java type

INT32

-

int/Integer

io.debezium.time.Date

java.time.LocalDate / java.util.Date / String yyyy-MM-dd

io.debezium.time.Time

java.time.Duration / String ISO-8601 PnDTnHnMn.nS

INT64

-

long/Long

io.debezium.time.Timestamp

java.time.Instant / String yyyy-MM-dd HH:mm:ss.SSS

io.debezium.time.MicroTimestamp

java.time.Instant / String yyyy-MM-dd HH:mm:ss.SSS

io.debezium.time.MicroTime

java.time.Duration / String ISO-8601 PnDTnHnMn.nS

io.debezium.time.NanoTimestamp

java.time.Instant / String yyyy-MM-dd HH:mm:ss.SSS

io.debezium.time.NanoTime

java.time.Duration / String ISO-8601 PnDTnHnMn.nS

FLOAT32

-

float/Float / String

FLOAT64

-

double/Double / String

BOOLEAN

-

boolean/Boolean / String

STRING

-

String

The RecordPart#value field contains Debezium’s message in a JSON format. This JSON format uses string as date representation, instead of ints, which are standard in Debezium but harder to work with.

We strongly recommend using time.precision.mode=adaptive (default). Using time.precision.mode=connect uses java.util.Date to represent dates, time, etc. and is less precise.

Migration tips

Hazelcast Community Edition has a Debezium CDC connector, but it’s based on an older version of Debezium. Migration to the new connector is straightforward but be aware of the following changes:

  • You should use the com.hazelcast.enterprise.jet.cdc package instead of com.hazelcast.jet.cdc.

  • Artifact names are now hazelcast-enterprise-cdc-debezium, hazelcast-enterprise-cdc-mysql and hazelcast-enterprise-cdc-postgres (instead of hazelcast-jet-…​).

  • Debezium renamed certain terms, which we have also replicated in our code. For example, include list replaces whitelist, exclude list replaces blacklist. This means, for example, you need to use setTableIncludeList instead of setTableWhitelist. For more detail on new Debezium names, see their MySQL and PostgreSQL documentation.