This is a prerelease version.

View latest

Join Change Data Capture Records

In this tutorial, you will learn how to make a map hold enriched data, combined (joined) from multiple database tables.

If you are using Hazelcast Community Edition, you have to change the package from com.hazelcast.enterprise.jet…​ to com.hazelcast.jet…​.

Step 1. Install Docker

This tutorial uses Docker to simplify the setup of databases, which you can freely experiment on.

  1. Follow Docker’s Get Started instructions and install it on your system.

  2. Test that it works:

    • Run docker version to check that you have the latest release installed.

    • Run docker run hello-world to verify that Docker is pulling images and running as expected.

Step 2. Start Database

Exact instructions for starting the supported databases are as follows:

Step 3. Start the Command Line Client

Exact instructions for starting the specific command line client of the supported databases are as follows:

Step 4. Create a New Java Project

We’ll assume you’re using an IDE. Create a blank Java project named cdc-tutorial and copy the provided Gradle or Maven file into it. They differ slightly depending on which database you use:

Step 5. Define a Data Pipeline

Let’s write the code for the processing we want to accomplish:

  • Enterprise Edition

  • Community Edition

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.enterprise.jet.cdc.ChangeRecord;
import com.hazelcast.enterprise.jet.cdc.mysql.MySqlCdcSources;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;

public class JetJob {

private static final int MAX_CONCURRENT_OPERATIONS = 1;

public static void main(String[] args) {
  StreamSource<ChangeRecord> source = MySqlCdcSources.mysql("source")
    .setDatabaseAddress("127.0.0.1", 3306)
    .setDatabaseCredentials("debezium", "dbz")
    .setClusterName("dbserver1")
    .setDatabaseIncludeList("inventory")
    .setTableIncludeList("inventory.customers", "inventory.orders")
    .build();

  Pipeline pipeline = Pipeline.create();
  StreamStage<ChangeRecord> allRecords = pipeline.readFrom(source)
    .withNativeTimestamps(0);

  allRecords.filter(r -> r.table().equals("customers"))
    .apply(Ordering::fix)
    .peek()
    .writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache",
            record -> (Integer) record.key().toMap().get("id"),
            CustomerEntryProcessor::new
    ));

  allRecords.filter(r -> r.table().equals("orders"))
    .apply(Ordering::fix)
    .peek()
    .writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache",
            record -> (Integer) record.value().toMap().get("purchaser"),
            OrderEntryProcessor::new
    ));

    JobConfig cfg = new JobConfig().setName("monitor");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(pipeline, cfg);
  }
}
package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.cdc.ChangeRecord;
import com.hazelcast.jet.cdc.mysql.MySqlCdcSources;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;

public class JetJob {

private static final int MAX_CONCURRENT_OPERATIONS = 1;

public static void main(String[] args) {
  StreamSource<ChangeRecord> source = MySqlCdcSources.mysql("source")
    .setDatabaseAddress("127.0.0.1", 3306)
    .setDatabaseCredentials("debezium", "dbz")
    .setClusterName("dbserver1")
    .setDatabaseWhitelist("inventory")
    .setTableWhitelist("inventory.customers", "inventory.orders")
    .build();

  Pipeline pipeline = Pipeline.create();
  StreamStage<ChangeRecord> allRecords = pipeline.readFrom(source)
    .withNativeTimestamps(0);

  allRecords.filter(r -> r.table().equals("customers"))
    .apply(Ordering::fix)
    .peek()
    .writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache",
            record -> (Integer) record.key().toMap().get("id"),
            CustomerEntryProcessor::new
    ));

  allRecords.filter(r -> r.table().equals("orders"))
    .apply(Ordering::fix)
    .peek()
    .writeTo(Sinks.mapWithEntryProcessor(MAX_CONCURRENT_OPERATIONS, "cache",
            record -> (Integer) record.value().toMap().get("purchaser"),
            OrderEntryProcessor::new
    ));

    JobConfig cfg = new JobConfig().setName("monitor");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(pipeline, cfg);
  }
}

If using Postgres, only the source would need to change, like this:

  • Enterprise Edition

  • Community Edition

StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
  .setDatabaseAddress("127.0.0.1", 5432)
  .setDatabaseCredentials("postgres", "postgres")
  .setDatabaseName("postgres")
  .setTableIncludeList("inventory.customers", "inventory.orders")
  .build();
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
  .setDatabaseAddress("127.0.0.1")
  .setDatabasePort(5432)
  .setDatabaseUser("postgres")
  .setDatabasePassword("postgres")
  .setDatabaseName("postgres")
  .setTableIncludeList("inventory.customers", "inventory.orders")
  .build();

As we can see from the pipeline code, our Sink is EntryProcessor based. The two EntryProcessors we use are:

package org.example;

import com.hazelcast.enterprise.jet.cdc.ChangeRecord;
import com.hazelcast.enterprise.jet.cdc.Operation;
import com.hazelcast.enterprise.jet.cdc.ParsingException;
import com.hazelcast.map.EntryProcessor;

import java.util.Map;

import static com.hazelcast.jet.impl.util.ExceptionUtil.rethrow;

public class CustomerEntryProcessor implements EntryProcessor<Integer, OrdersOfCustomer, Object> {

  private final Customer customer;

  public CustomerEntryProcessor(ChangeRecord record) {
    try {
      this.customer = Operation.DELETE.equals(record.operation()) ? null :
      record.value().toObject(Customer.class);
    } catch (ParsingException e) {
        throw rethrow(e);
    }
  }

  @Override
  public Object process(Map.Entry<Integer, OrdersOfCustomer> entry) {
    OrdersOfCustomer value = entry.getValue();
    if (customer == null) {
      if (value != null) {
          value.setCustomer(null);
      }
    } else {
      if (value == null) {
          value = new OrdersOfCustomer();
      }
      value.setCustomer(customer);
    }
    entry.setValue(value);
    return null;
  }
}
package org.example;

import com.hazelcast.enterprise.jet.cdc.ChangeRecord;
import com.hazelcast.enterprise.jet.cdc.Operation;
import com.hazelcast.enterprise.jet.cdc.ParsingException;
import com.hazelcast.map.EntryProcessor;

import java.util.Map;

import static com.hazelcast.jet.impl.util.ExceptionUtil.rethrow;

public class OrderEntryProcessor implements EntryProcessor<Integer, OrdersOfCustomer, Object> {

  private final Operation operation;
  private final Order order;

  public OrderEntryProcessor(ChangeRecord record) {
    try {
      this.order = record.value().toObject(Order.class);
      this.operation = record.operation();
    } catch (ParsingException e) {
      throw rethrow(e);
    }
  }

  @Override
  public Object process(Map.Entry<Integer, OrdersOfCustomer> entry) {
    OrdersOfCustomer value = entry.getValue();
    if (Operation.DELETE.equals(operation)) {
      if (value != null) {
        value.deleteOrder(order);
      }
    } else {
        if (value == null) {
          value = new OrdersOfCustomer();
        }
        value.addOrUpdateOrder(order);
    }
    entry.setValue(value);
    return null;
  }
}

In them we use the Customer and the Order classes to achieve convenient data parsing with the help of data to object mapping.

package org.example;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.io.Serializable;
import java.util.Objects;

public class Customer implements Serializable {

    @JsonProperty("id")
    public int id;

    @JsonProperty("first_name")
    public String firstName;

    @JsonProperty("last_name")
    public String lastName;

    @JsonProperty("email")
    public String email;

    Customer() {
    }

    @Override
    public int hashCode() {
        return Objects.hash(email, firstName, id, lastName);
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        Customer other = (Customer) obj;
        return id == other.id
                && Objects.equals(firstName, other.firstName)
                && Objects.equals(lastName, other.lastName)
                && Objects.equals(email, other.email);
    }

    @Override
    public String toString() {
        return "Customer {id=" + id + ", firstName=" + firstName + ", lastName=" + lastName + ", email=" + email + '}';
    }
}
package org.example;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.io.Serializable;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class Order implements Serializable {

    @JsonProperty("order_number")
    public int orderNumber;

    @JsonProperty("order_date")
    public Date orderDate;

    @JsonProperty("purchaser")
    public int purchaser;

    @JsonProperty("quantity")
    public int quantity;

    @JsonProperty("product_id")
    public int productId;

    Order() {
    }

    public void setOrderDate(Date orderDate) { //used by object mapping
        long days = orderDate.getTime();
        this.orderDate = new Date(TimeUnit.DAYS.toMillis(days));
    }

    public int getOrderNumber() {
        return orderNumber;
    }

    @Override
    public int hashCode() {
        return Objects.hash(orderNumber, orderDate, purchaser, quantity, productId);
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        Order other = (Order) obj;
        return orderNumber == other.orderNumber
                && Objects.equals(orderDate, other.orderDate)
                && Objects.equals(purchaser, other.purchaser)
                && Objects.equals(quantity, other.quantity)
                && Objects.equals(productId, other.productId);
    }

    @Override
    public String toString() {
        return "Order {orderNumber=" + orderNumber + ", orderDate=" + orderDate + ", purchaser=" + purchaser +
                ", quantity=" + quantity + ", productId=" + productId + '}';
    }

}

Watch out, in the Postgres database the order number column has a different name, id, so the first field in Order needs to be changed to:

@JsonProperty("id")
public int orderNumber;

Besides these two data classes we also need to define our enriched structure, called OrdersOfCustomers, which will be stored in the target IMap:

package org.example;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class OrdersOfCustomer implements Serializable {

  private final Map<Integer, Order> orders;
  private Customer customer;

  public OrdersOfCustomer() {
    this.customer = null;
    this.orders = new HashMap<>();
  }

  public void setCustomer(Customer customer) {
    this.customer = customer;
  }

  public void deleteOrder(Order order) {
    orders.remove(order.getOrderNumber());
  }

  public void addOrUpdateOrder(Order order) {
    orders.put(order.getOrderNumber(), order);
  }

  @Override
  public int hashCode() {
    return Objects.hash(customer, orders);
  }

  @Override
  public boolean equals(Object obj) {
    if (this == obj) {
      return true;
    }
    if (obj == null || getClass() != obj.getClass()) {
      return false;
    }
      OrdersOfCustomer other = (OrdersOfCustomer) obj;
    return Objects.equals(customer, other.customer)
    && Objects.equals(orders, other.orders);
  }

  @Override
  public String toString() {
    return String.format("Customer: %s, Orders: %s", customer, orders);
  }
}

There is also another element in the pipeline, an extra processing stage which handles and fixes event reordering that might happen due to parallel processing. It’s based on sequence numbers specific to CDC sources and so can be used only for these kinds of pipelines. Hopefully a future version of Hazelcast will introduce a generic solution for the reordering problem.

package org.example;

import com.hazelcast.jet.accumulator.LongAccumulator;
import com.hazelcast.enterprise.jet.cdc.ChangeRecord;
import com.hazelcast.enterprise.jet.cdc.RecordPart;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.pipeline.StreamStage;

import java.util.concurrent.TimeUnit;

public class Ordering {

    static StreamStage<ChangeRecord> fix(StreamStage<ChangeRecord> input) {
        return input
                .groupingKey(ChangeRecord::key)
                .mapStateful(
                        TimeUnit.SECONDS.toMillis(10),
                        () -> new LongAccumulator(0),
                        (lastSequence, key, record) -> {
                            long sequence = record.sequenceValue();
                            if (lastSequence.get() < sequence) {
                                lastSequence.set(sequence);
                                return record;
                            }
                            return null;
                        },
                        (TriFunction<LongAccumulator, RecordPart, Long, ChangeRecord>) (sequence, recordPart, aLong) -> null);
    }

}

To make it evident that our pipeline serves the purpose of building an up-to-date cache of "orders of customers", which can be interrogated at any time, let’s add one more class. This code can be executed at will in your IDE and prints the current content of the cache.

package org.example;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.core.HazelcastInstance;

public class CacheRead {

  public static void main(String[] args) {
    HazelcastInstance instance = HazelcastClient.newHazelcastClient();

    System.out.println("Currently there are following customers in the cache:");
    instance.getMap("cache").values().forEach(c -> System.out.println("\t" + c));

    instance.shutdown();
  }
}

Step 6. Package the Pipeline into a JAR

Now that we have defined all the pieces, we need to submit the pipeline to Hazelcast for execution. Since Hazelcast runs on our machine as a standalone cluster in a standalone process we need to make it aware of all the code that we have written.

For this reason we create a JAR containing everything we need. All we need to do is to run the build command:

  • Gradle

  • Maven

gradle build

This will produce a JAR file called cdc-tutorial-1.0-SNAPSHOT.jar in the build/libs directory of our project.

mvn package

This will produce a JAR file called cdc-tutorial-1.0-SNAPSHOT.jar in the target directory or our project.

Step 7. Start Hazelcast

  1. Download Hazelcast.

  2. Make sure the CDC plugin for the database is in the lib/ directory.

    ls lib/

    You should see the following jars:

  • Enterprise Edition

  • Community Edition

  • hazelcast-enterprise-cdc-debezium-6.0.0-SNAPSHOT.jar

  • hazelcast-enterprise-cdc-mysql-6.0.0-SNAPSHOT.jar (for MySQL)

  • hazelcast-enterprise-cdc-postgres-6.0.0-SNAPSHOT.jar (for Postgres)

  • hazelcast-jet-cdc-debezium-6.0.0-SNAPSHOT.jar

  • hazelcast-jet-cdc-mysql-6.0.0-SNAPSHOT.jar (for MySQL)

  • hazelcast-jet-cdc-postgres-6.0.0-SNAPSHOT.jar (for Postgres)

  1. Enable user code deployment:

    User Code Deployment has been deprecated and will be removed in the next major version. To continue deploying your user code after this time, Community Edition users can either upgrade to Enterprise Edition, or add their resources to the Hazelcast member class paths. Hazelcast recommends that Enterprise Edition users migrate their user code to use User Code Namespaces. For further information on migrating from User Code Deployment to User Code Namespaces, see Migrate from User Code Deployment.

    Due to the type of sink we are using in our pipeline we need to make some extra changes in order for the Hazelcast cluster to be aware of the custom classes we have defined.

    Please append following config lines to the config/hazelcast.yaml file, at the end of the hazelcast block:

      user-code-deployment:
        enabled: true
        provider-mode: LOCAL_AND_CACHED_CLASSES

    Also add these config lines to the config/hazelcast-client.yaml file, at the end of the hazelcast-client block:

    • Gradle

    • Maven

      user-code-deployment:
        enabled: true
        jarPaths:
          - <path_to_project>/build/libs/cdc-tutorial-1.0-SNAPSHOT.jar
      user-code-deployment:
        enabled: true
        jarPaths:
          - <path_to_project>/target/cdc-tutorial-1.0-SNAPSHOT.jar

    Make sure to replace <path_to_project> with the absolute path to where you created the project for this tutorial.

  2. Start Hazelcast.

    bin/hz-start
  3. When you see output like this, Hazelcast is up:

    Members {size:1, ver:1} [
        Member [192.168.1.5]:5701 - e7c26f7c-df9e-4994-a41d-203a1c63480e this
    ]

Step 8. Submit the Job for Execution

Assuming our cluster is running and the database is up, all we need to issue is following command:

  • Gradle

  • Maven

bin/hz-cli submit build/libs/cdc-tutorial-1.0-SNAPSHOT.jar
bin/hz-cli submit target/cdc-tutorial-1.0-SNAPSHOT.jar

The output in the Hazelcast member’s log should look something like this (we see these lines due to the peek() stages we’ve inserted):

........
... Output to ordinal 0: key:{{"order_number":10002}}, value:{{"order_number":10002,"order_date":16817,"purchaser":1002,"quantity":2,"product_id":105,"__op":"c","__db":"inventory","__table":"orders","__ts_ms":1593681751174,"__deleted":"false"}} (eventTime=12:22:31.174)
... Output to ordinal 0: key:{{"order_number":10003}}, value:{{"order_number":10003,"order_date":16850,"purchaser":1002,"quantity":2,"product_id":106,"__op":"c","__db":"inventory","__table":"orders","__ts_ms":1593681751174,"__deleted":"false"}} (eventTime=12:22:31.174)
... Output to ordinal 0: key:{{"id":1003}}, value:{{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com","__op":"c","__db":"inventory","__table":"customers","__ts_ms":1593681751161,"__deleted":"false"}} (eventTime=12:22:31.161)
........

Step 9. Track Updates

Let’s see how our cache looks like at this time. If we execute the CacheRead code defined above, we’ll get:

Currently there are following customers in the cache:
    Customer: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}, Orders: {10002=Order {orderNumber=10002, orderDate=Sun Jan 17 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=105}, 10003=Order {orderNumber=10003, orderDate=Fri Feb 19 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=106}}
    Customer: Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}, Orders: {10004=Order {orderNumber=10004, orderDate=Sun Feb 21 02:00:00 EET 2016, purchaser=1003, quantity=1, productId=107}}
    Customer: Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}, Orders: {}
    Customer: Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}, Orders: {10001=Order {orderNumber=10001, orderDate=Sat Jan 16 02:00:00 EET 2016, purchaser=1001, quantity=1, productId=102}}

Let’s do some updates in our database. Go to the database CLI we’ve started earlier and run following commands:

INSERT INTO inventory.customers VALUES (1005, 'Jason', 'Bourne', 'jason@bourne.org');
DELETE FROM inventory.orders WHERE order_number=10002;

If we check the cache with CacheRead we get:

Currently there are following customers in the cache:
    Customer: Customer {id=1005, firstName=Jason, lastName=Bourne, email=jason@bourne.org}, Orders: {}
    Customer: Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}, Orders: {10003=Order {orderNumber=10003, orderDate=Fri Feb 19 02:00:00 EET 2016, purchaser=1002, quantity=2, productId=106}}
    Customer: Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}, Orders: {10004=Order {orderNumber=10004, orderDate=Sun Feb 21 02:00:00 EET 2016, purchaser=1003, quantity=1, productId=107}}
    Customer: Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}, Orders: {}
    Customer: Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}, Orders: {10001=Order {orderNumber=10001, orderDate=Sat Jan 16 02:00:00 EET 2016, purchaser=1001, quantity=1, productId=102}}

Step 10. Clean up

  1. Cancel the job.

    bin/hz-cli cancel postgres-monitor

Shut down the Hazelcast cluster.

+

bin/hz-stop
  1. Use Docker to stop the running container (this will kill the command-line client too, since it’s running in the same container):

    • MySQL

    • Postgres

    You can use Docker to stop all running containers:

    docker stop mysqlterm mysql

    You can use Docker to stop the running container (this will kill the command-line client too, since it’s running in the same container):

    docker stop postgres

    Since we’ve used the --rm flag when starting the connectors, Docker should remove them right after we stop them. We can verify that all processes are stopped and removed with following command:

docker ps -a