Join Change Data Capture Records
In this tutorial, you will learn how to make a map hold enriched data, combined (joined) from multiple database tables.
Step 1. Install Docker
This tutorial uses Docker to simplify the setup of databases, which you can freely experiment on.
-
Follow Docker’s Get Started instructions and install it on your system.
-
Test that it works:
-
Run
docker version
to check that you have the latest release installed. -
Run
docker run hello-world
to verify that Docker is pulling images and running as expected.
-
Step 5. Define a Data Pipeline
Let’s write the code for the processing we want to accomplish:
package org.example;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.cdc.ChangeRecord;
import com.hazelcast.jet.cdc.mysql.MySqlCdcSources;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
public class JetJob {
private static final int MAX_CONCURRENT_OPERATIONS = 1;
public static void main(String[] args) {
StreamSource<ChangeRecord> source = MySqlCdcSources.mysql("source")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(3306)
.setDatabaseUser("debezium")
.setDatabasePassword("dbz")
.setClusterName("dbserver1")
.setDatabaseWhitelist("inventory")
.setTableWhitelist("inventory.customers", "inventory.orders")
.build();
Pipeline pipeline = Pipeline.create();
StreamStage<ChangeRecord> allRecords = pipeline.readFrom(source)
.withNativeTimestamps(0);
allRecords.filter(r -> r.table().equals("customers"))
.apply(Ordering::fix)
.peek()
.writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache",
record -> (Integer) record.key().toMap().get("id"),
CustomerEntryProcessor::new
));
allRecords.filter(r -> r.table().equals("orders"))
.apply(Ordering::fix)
.peek()
.writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache",
record -> (Integer) record.value().toMap().get("purchaser"),
OrderEntryProcessor::new
));
JobConfig cfg = new JobConfig().setName("monitor");
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
hz.getJet().newJob(pipeline, cfg);
}
}
If using Postgres, only the source would need to change, like this:
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
.setDatabaseAddress("127.0.0.1")
.setDatabasePort(5432)
.setDatabaseUser("postgres")
.setDatabasePassword("postgres")
.setDatabaseName("postgres")
.setTableWhitelist("inventory.customers", "inventory.orders")
.build();
As we can see from the pipeline code, our Sink
is EntryProcessor
based. The two EntryProcessors
we use are:
package org.example;
import com.hazelcast.jet.cdc.ChangeRecord;
import com.hazelcast.jet.cdc.Operation;
import com.hazelcast.jet.cdc.ParsingException;
import com.hazelcast.map.EntryProcessor;
import java.util.Map;
import static com.hazelcast.jet.impl.util.ExceptionUtil.rethrow;
public class CustomerEntryProcessor implements EntryProcessor<Integer, OrdersOfCustomer, Object> {
private final Customer customer;
public CustomerEntryProcessor(ChangeRecord record) {
try {
this.customer = Operation.DELETE.equals(record.operation()) ? null :
record.value().toObject(Customer.class);
} catch (ParsingException e) {
throw rethrow(e);
}
}
@Override
public Object process(Map.Entry<Integer, OrdersOfCustomer> entry) {
OrdersOfCustomer value = entry.getValue();
if (customer == null) {
if (value != null) {
value.setCustomer(null);
}
} else {
if (value == null) {
value = new OrdersOfCustomer();
}
value.setCustomer(customer);
}
entry.setValue(value);
return null;
}
}
package org.example;
import com.hazelcast.jet.cdc.ChangeRecord;
import com.hazelcast.jet.cdc.Operation;
import com.hazelcast.jet.cdc.ParsingException;
import com.hazelcast.map.EntryProcessor;
import java.util.Map;
import static com.hazelcast.jet.impl.util.ExceptionUtil.rethrow;
public class OrderEntryProcessor implements EntryProcessor<Integer, OrdersOfCustomer, Object> {
private final Operation operation;
private final Order order;
public OrderEntryProcessor(ChangeRecord record) {
try {
this.order = record.value().toObject(Order.class);
this.operation = record.operation();
} catch (ParsingException e) {
throw rethrow(e);
}
}
@Override
public Object process(Map.Entry<Integer, OrdersOfCustomer> entry) {
OrdersOfCustomer value = entry.getValue();
if (Operation.DELETE.equals(operation)) {
if (value != null) {
value.deleteOrder(order);
}
} else {
if (value == null) {
value = new OrdersOfCustomer();
}
value.addOrUpdateOrder(order);
}
entry.setValue(value);
return null;
}
}
In them we use the Customer
and the Order
classes to achieve
convenient data parsing with the help of data to object
mapping.
package org.example;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
import java.util.Objects;
public class Customer implements Serializable {
@JsonProperty("id")
public int id;
@JsonProperty("first_name")
public String firstName;
@JsonProperty("last_name")
public String lastName;
@JsonProperty("email")
public String email;
Customer() {
}
@Override
public int hashCode() {
return Objects.hash(email, firstName, id, lastName);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Customer other = (Customer) obj;
return id == other.id
&& Objects.equals(firstName, other.firstName)
&& Objects.equals(lastName, other.lastName)
&& Objects.equals(email, other.email);
}
@Override
public String toString() {
return "Customer {id=" + id + ", firstName=" + firstName + ", lastName=" + lastName + ", email=" + email + '}';
}
}
package org.example;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class Order implements Serializable {
@JsonProperty("order_number")
public int orderNumber;
@JsonProperty("order_date")
public Date orderDate;
@JsonProperty("purchaser")
public int purchaser;
@JsonProperty("quantity")
public int quantity;
@JsonProperty("product_id")
public int productId;
Order() {
}
public void setOrderDate(Date orderDate) { //used by object mapping
long days = orderDate.getTime();
this.orderDate = new Date(TimeUnit.DAYS.toMillis(days));
}
public int getOrderNumber() {
return orderNumber;
}
@Override
public int hashCode() {
return Objects.hash(orderNumber, orderDate, purchaser, quantity, productId);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Order other = (Order) obj;
return orderNumber == other.orderNumber
&& Objects.equals(orderDate, other.orderDate)
&& Objects.equals(purchaser, other.purchaser)
&& Objects.equals(quantity, other.quantity)
&& Objects.equals(productId, other.productId);
}
@Override
public String toString() {
return "Order {orderNumber=" + orderNumber + ", orderDate=" + orderDate + ", purchaser=" + purchaser +
", quantity=" + quantity + ", productId=" + productId + '}';
}
}
Watch out, in the Postgres database the order number column has a
different name, id
, so the first field in Order
needs to be changed
to:
@JsonProperty("id")
public int orderNumber;
Besides these two data classes we also need to define our enriched
structure, called OrdersOfCustomers
, which will be stored in the
target IMap
:
package org.example;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class OrdersOfCustomer implements Serializable {
private final Map<Integer, Order> orders;
private Customer customer;
public OrdersOfCustomer() {
this.customer = null;
this.orders = new HashMap<>();
}
public void setCustomer(Customer customer) {
this.customer = customer;
}
public void deleteOrder(Order order) {
orders.remove(order.getOrderNumber());
}
public void addOrUpdateOrder(Order order) {
orders.put(order.getOrderNumber(), order);
}
@Override
public int hashCode() {
return Objects.hash(customer, orders);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
OrdersOfCustomer other = (OrdersOfCustomer) obj;
return Objects.equals(customer, other.customer)
&& Objects.equals(orders, other.orders);
}
@Override
public String toString() {
return String.format("Customer: %s, Orders: %s", customer, orders);
}
}
There is also another element in the pipeline, an extra processing stage which handles and fixes event reordering that might happen due to parallel processing. It’s based on sequence numbers specific to CDC sources and so can be used only for these kinds of pipelines. Hopefully a future version of Hazelcast will introduce a generic solution for the reordering problem.
package org.example;
import com.hazelcast.jet.accumulator.LongAccumulator;
import com.hazelcast.jet.cdc.ChangeRecord;
import com.hazelcast.jet.cdc.RecordPart;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.pipeline.StreamStage;
import java.util.concurrent.TimeUnit;
public class Ordering {
static StreamStage<ChangeRecord> fix(StreamStage<ChangeRecord> input) {
return input
.groupingKey(ChangeRecord::key)
.mapStateful(
TimeUnit.SECONDS.toMillis(10),
() -> new LongAccumulator(0),
(lastSequence, key, record) -> {
long sequence = record.sequenceValue();
if (lastSequence.get() < sequence) {
lastSequence.set(sequence);
return record;
}
return null;
},
(TriFunction<LongAccumulator, RecordPart, Long, ChangeRecord>) (sequence, recordPart, aLong) -> null);
}
}
To make it evident that our pipeline serves the purpose of building an up-to-date cache of "orders of customers", which can be interrogated at any time, let’s add one more class. This code can be executed at will in your IDE and prints the current content of the cache.
package org.example;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.core.HazelcastInstance;
public class CacheRead {
public static void main(String[] args) {
HazelcastInstance instance = HazelcastClient.newHazelcastClient();
System.out.println("Currently there are following customers in the cache:");
instance.getMap("cache").values().forEach(c -> System.out.println("\t" + c));
instance.shutdown();
}
}
Step 6. Package the Pipeline into a JAR
Now that we have defined all the pieces, we need to submit the pipeline to Hazelcast for execution. Since Hazelcast runs on our machine as a standalone cluster in a standalone process we need to make it aware of all the code that we have written.
For this reason we create a JAR containing everything we need. All we need to do is to run the build command:
Step 7. Start Hazelcast
-
Make sure the CDC plugin for the database is in the
lib/
directory.ls lib/
You should see the following jars:
-
hazelcast-jet-cdc-debezium-5.1.7.jar
-
hazelcast-jet-cdc-mysql-5.1.7.jar (for MySQL)
-
hazelcast-jet-cdc-postgres-5.1.7.jar (for Postgres)
-
-
Enable user code deployment:
Due to the type of sink we are using in our pipeline we need to make some extra changes in order for the Hazelcast cluster to be aware of the custom classes we have defined.
Please append following config lines to the
config/hazelcast.yaml
file, at the end of thehazelcast
block:user-code-deployment: enabled: true provider-mode: LOCAL_AND_CACHED_CLASSES
Also add these config lines to the
config/hazelcast-client.yaml
file, at the end of thehazelcast-client
block:user-code-deployment: enabled: true jarPaths: - <path_to_project>/build/libs/cdc-tutorial-1.0-SNAPSHOT.jar
user-code-deployment: enabled: true jarPaths: - <path_to_project>/target/cdc-tutorial-1.0-SNAPSHOT.jar
Make sure to replace
<path_to_project>
with the absolute path to where you created the project for this tutorial. -
Start Hazelcast.
bin/hz-start
-
When you see output like this, Hazelcast is up:
Members {size:1, ver:1} [ Member [192.168.1.5]:5701 - e7c26f7c-df9e-4994-a41d-203a1c63480e this ]
Step 8. Submit the Job for Execution
bin/hz-cli submit build/libs/cdc-tutorial-1.0-SNAPSHOT.jar
bin/hz-cli submit target/cdc-tutorial-1.0-SNAPSHOT.jar
The output in the Hazelcast member’s log should look something like this (we
see these lines due to the peek()
stages we’ve inserted):
........
... Output to ordinal 0: key:{{"order_number":10002}}, value:{{"order_number":10002,"order_date":16817,"purchaser":1002,"quantity":2,"product_id":105,"__op":"c","__db":"inventory","__table":"orders","__ts_ms":1593681751174,"__deleted":"false"}} (eventTime=12:22:31.174)
... Output to ordinal 0: key:{{"order_number":10003}}, value:{{"order_number":10003,"order_date":16850,"purchaser":1002,"quantity":2,"product_id":106,"__op":"c","__db":"inventory","__table":"orders","__ts_ms":1593681751174,"__deleted":"false"}} (eventTime=12:22:31.174)
... Output to ordinal 0: key:{{"id":1003}}, value:{{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com","__op":"c","__db":"inventory","__table":"customers","__ts_ms":1593681751161,"__deleted":"false"}} (eventTime=12:22:31.161)
........
Step 9. Track Updates
Let’s see how our cache looks like at this time. If we execute the
CacheRead
code defined above, we’ll get:
Currently there are following customers in the cache:
Customer: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}, Orders: {10002=Order {orderNumber=10002, orderDate=Sun Jan 17 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=105}, 10003=Order {orderNumber=10003, orderDate=Fri Feb 19 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=106}}
Customer: Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}, Orders: {10004=Order {orderNumber=10004, orderDate=Sun Feb 21 02:00:00 EET 2016, purchaser=1003, quantity=1, productId=107}}
Customer: Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}, Orders: {}
Customer: Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}, Orders: {10001=Order {orderNumber=10001, orderDate=Sat Jan 16 02:00:00 EET 2016, purchaser=1001, quantity=1, productId=102}}
Let’s do some updates in our database. Go to the database CLI we’ve started earlier and run following commands:
INSERT INTO inventory.customers VALUES (1005, 'Jason', 'Bourne', 'jason@bourne.org');
DELETE FROM inventory.orders WHERE order_number=10002;
If we check the cache with CacheRead
we get:
Currently there are following customers in the cache:
Customer: Customer {id=1005, firstName=Jason, lastName=Bourne, email=jason@bourne.org}, Orders: {}
Customer: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}, Orders: {10003=Order {orderNumber=10003, orderDate=Fri Feb 19 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=106}}
Customer: Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}, Orders: {10004=Order {orderNumber=10004, orderDate=Sun Feb 21 02:00:00 EET 2016, purchaser=1003, quantity=1, productId=107}}
Customer: Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}, Orders: {}
Customer: Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}, Orders: {10001=Order {orderNumber=10001, orderDate=Sat Jan 16 02:00:00 EET 2016, purchaser=1001, quantity=1, productId=102}}
Step 10. Clean up
-
Cancel the job.
bin/hz-cli cancel postgres-monitor
Shut down the Hazelcast cluster.
+
bin/hz-stop
-
Use Docker to stop the running container (this will kill the command-line client too, since it’s running in the same container):
You can use Docker to stop all running containers:
docker stop mysqlterm mysql
You can use Docker to stop the running container (this will kill the command-line client too, since it’s running in the same container):
docker stop postgres
Since we’ve used the
--rm
flag when starting the connectors, Docker should remove them right after we stop them. We can verify that all processes are stopped and removed with following command:
docker ps -a