CDC Connector
This page refers to Hazelcast’s Enterprise Edition CDC connectors. For more information on Community Edition CDC connectors, see Legacy CDC Connector. |
Change Data Capture (CDC) refers to the process of observing changes made to a database and extracting them in a form usable by other systems, for the purposes of replication, analysis and many more.
Change Data Capture is especially important to Hazelcast, because it allows for the streaming of changes from databases, which can be efficiently processed by the Jet engine.
The implementation of CDC in Hazelcast Enterprise Edition is based on Debezium 2.x. Hazelcast offers a generic Debezium source which can handle CDC events from any database supported by Debezium, However, we’re also striving to make CDC sources first class citizens in Hazelcast, as we have done already for MySQL and PostgreSQL.
Install the CDC connector
This connector is included in the full distribution of Hazelcast Enterprise Edition.
Maven
To use this connector in a Maven project, add the following entries to the <dependency>
section of your pom.xml
file:
Generic connector:
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-debezium</artifactId>
<version>{full-version}</version>
</dependency>
MySQL-specific connector:
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-mysql</artifactId>
<version>{full-version}</version>
</dependency>
Due to licensing, MySQL connector does not include the MySQL driver as a dependency. You have to manually add the com.mysql:mysql-connector-j dependency to the classpath.
|
PostgreSQL-specific connector:
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-enterprise-cdc-postgres</artifactId>
<version>{full-version}</version>
</dependency>
CDC as a source
The Java API supports the following types of CDC source:
-
DebeziumCdcSources: a generic source for all databases supported by Debezium
-
MySqlCdcSources: a specific, first class Jet CDC source for MySQL databases (also based on Debezium, but with the additional benefits provided by Hazelcast)
-
PostgresCdcSources: a specific, first class CDC source for PostgreSQL databases (also based on Debezium, but with the additional benefits provided by Hazelcast)
To set up a CDC data streaming source, define it using the following configuration:
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
MySqlCdcSources.mysql("customers")
.setDatabaseAddress("127.0.0.1", 3306)
.setDatabaseCredentials("debezium", "dbz")
.setClusterName("dbserver1")
.setDatabaseIncludeList("inventory")
.setTableIncludeList("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
PostgresCdcSources.postgres("customers")
.setDatabaseAddress("127.0.0.1", 5432)
.setDatabaseCredentials("debezium", "dbz")
.setClusterName("dbserver1")
.setDatabaseIncludeList("inventory")
.setTableIncludeList("inventory.customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
DebeziumCdcSources.debezium("customers", MongoDbConnector.class)
.setProperty("mongodb.connection.string", "mongodb://localhost:27017")
.setDatabaseIncludeList("inventory")
.setProperty("collection.include.list", "customers")
.build())
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
MySQL- and PostgreSQL-specific source builders contain methods for all major configuration settings with protection if, for example, mutually exclusive options are not used. If using a generic source builder, refer to the Debezium documentation for the information about required or mutually exclusive fields.
Follow the provided Capture Changes from MySQL tutorial to see how CDC processes change events from a MySQL database.
Remember to ensure your database is up and running before a CDC job is started, including any additional required CDC agents (as required by DB2), for example. |
Common source builder functions
Method name | Description |
---|---|
|
Sets output type to |
|
Sets output type to |
|
Sets the output type to an arbitrary user type, |
|
Sets the preferred engine to the default (non-async) one. This engine is single-threaded, but also more widely used and tested. Use this engine for the most stable results (for example, no async offset restore). For MySQL and PostgreSQL especially this engine makes the most sense, as MySQL and PostgreSQL Debezium connectors are single-threaded only. |
|
Sets the preferred engine to the async one. This engine is multithreaded (if supported by the connector), but be aware of the async nature; for example, offset restore may occur asynchronously after the restart is done, leading to sometimes confusing results. |
|
Sets connector property to given value. There are multiple overloads, allowing to
set the value to |
Fault tolerance
CDC sources offer at least-once processing guarantees. The source periodically saves the database write ahead log offset for which it had dispatched events and in case of a failure/restart it will replay all events since the last successfully saved offset.
Unfortunately, however, there is no guarantee that the last saved offset is still in the database changelog. Such logs are always finite and depending on the DB configuration can be relatively short, so if the CDC source has to replay data for a long period of inactivity, then there can be a data loss. With careful management though we can say that at-least once guarantee can practically be provided.
CDC as a Sink
Change data capture is a source-side functionality in Jet, but we also
offer some specialized sinks that simplify applying CDC events to a map, which gives you the ability to reconstruct the contents of the
original database table. The sinks expect to receive ChangeRecord
objects and apply your custom functions to them that extract the key and
the value that will be applied to the target map.
For example, a sink mapping CDC data to a Customer
class and
maintaining a map view of latest known email addresses per customer
(identified by ID) would look like this:
Pipeline p = Pipeline.create();
p.readFrom(source)
.withoutTimestamps()
.writeTo(CdcSinks.map("customers",
r -> r.key().toMap().get("id"),
r -> r.value().toObject(Customer.class).email));
The key and value functions have certain limitations. They can be used to map only to objects which the Hazelcast member can deserialize, which unfortunately doesn’t include user code submitted as a part of the job. So in the above example it’s OK to have If user code has to be used, then the problem can be solved with the help of the User Code Deployment feature. Example configs for that can be seen in our CDC Join tutorial. |
Data types
Hazelcast relies on Debezium, which in turn uses the Kafka Connect API, including Struct
objects for example. Hazelcast makes conversion to Map
and POJO`s easier by providing abstractions such as `RecordPart
. Despite this, it’s worth knowing how some database types can or will be mapped to Java types.
Each database type has its own database type-to-struct type mappings. For specific mappings of this type, see the Debezium documentation, for example: MySQL, PostgreSQL, DB2, etc.. |
Common datatypes mapping.
Struct type | Semantic type | Java type |
---|---|---|
|
- |
int/Integer |
|
java.time.LocalDate / java.util.Date / String |
|
|
java.time.Duration / String ISO-8601 |
|
|
- |
long/Long |
|
java.time.Instant / String |
|
|
java.time.Instant / String |
|
|
java.time.Duration / String ISO-8601 |
|
|
java.time.Instant / String |
|
|
java.time.Duration / String ISO-8601 |
|
|
- |
float/Float / String |
|
- |
double/Double / String |
|
- |
boolean/Boolean / String |
|
- |
String |
The RecordPart#value
field contains Debezium’s message in a JSON format. This JSON format uses string as date representation,
instead of ints, which are standard in Debezium but harder to work with.
We strongly recommend using |
Migration tips
Hazelcast Community Edition has a Debezium CDC connector, but it’s based on an older version of Debezium. Migration to the new connector is straightforward but be aware of the following changes:
-
You should use the
com.hazelcast.enterprise.jet.cdc
package instead ofcom.hazelcast.jet.cdc
. -
Artifact names are now
hazelcast-enterprise-cdc-debezium
,hazelcast-enterprise-cdc-mysql
andhazelcast-enterprise-cdc-postgres
(instead ofhazelcast-jet-…
). -
Debezium renamed certain terms, which we have also replicated in our code. For example,
include list
replaceswhitelist
,exclude list
replacesblacklist
. This means, for example, you need to usesetTableIncludeList
instead ofsetTableWhitelist
. For more detail on new Debezium names, see their MySQL and PostgreSQL documentation.