Stream-To-Stream Joins with SQL

Learn how to join two streams of data and process the results, using SQL.

open in gitpod

Context

A stream of data is an ongoing delivery of data events. These events are often of the same type. For example, a stream may contain click events on a website.

If you have two or more streams of related data, you can join them together on a related field, process them, and store the result.

Example Use Case

You have two streams. One stream contains events for new orders and the other contains events for shipped orders. You need to join these two streams to find out which orders have been successfully shipped within seven days, and from which warehouse they were shipped.

Before you Begin

Before starting this tutorial, make sure that you have the following prerequisites:

Step 1. Clone the Project

To set up the project, you need to download the code from GitHub.

  1. Clone the GitHub repository.

    • HTTPS

    • SSH

    git clone https://github.com/hazelcast-guides/stream-to-stream-joins.git
    cd stream-to-stream-joins
    git clone git@github.com:hazelcast-guides/stream-to-stream-joins.git
    cd stream-to-stream-joins

Step 2. Start the Docker Containers

In this step, you’ll use Docker Compose to start all the Docker containers, including a Kafka broker, Hazelcast Platform, and Management Center.

docker compose up -d

You should see the following:

[+] Running 4/4
 ⠿ Container zookeeper          Started                                                                                                                      0.7s
 ⠿ Container management-center  Started                                                                                                                      0.6s
 ⠿ Container hazelcast          Started                                                                                                                      0.7s
 ⠿ Container broker             Started                                                                                                                      1.2s

The Docker containers are running in detatched mode. You can see that they are running, using the following command:

docker ps

To see the logs of your Hazelcast member, use the following command:

docker logs hazelcast

You should see that you have a single member running in the cluster.

Members {size:1, ver:1} [
	Member [172.19.0.4]:5701 - 15116025-342b-43c0-83f7-a2a90f0281ce this
]

Step 3. Create Two Kafka Topics

To create the Kafka topics, you’ll use the kafka-console-producer script that’s built into the Kafka broker.

  1. Create the orders topic and add some records to it.

    docker exec -i broker kafka-console-producer --broker-list broker:9092 --topic orders < orders.jsonl
  2. Create the shipments topic and add some records to it.

    docker exec -i broker kafka-console-producer --broker-list broker:9092 --topic shipments < shipments.jsonl

Step 4. Create Mappings to the Kafka Topics

In this step, you’ll use the SQL shell in Hazelcast to create a mapping to the Kafka topics. With this mapping, Hazelcast will be able to receive the event streams.

  1. Open the SQL shell.

    docker exec -it hazelcast hz-cli sql
  2. Create a mapping to the orders topic.

    CREATE OR REPLACE MAPPING orders(
      id INT,
      order_ts TIMESTAMP WITH TIME ZONE,
      total_amount DOUBLE,
      customer_name VARCHAR)
    TYPE Kafka
    OPTIONS (
      'keyFormat' = 'int', (1)
      'valueFormat' = 'json-flat', (2)
      'auto.offset.reset' = 'earliest', (3)
      'bootstrap.servers' = 'broker:9092'); (4)
    1 The kafka record key, which is the ID of the orders and shipments.
    2 Map the Kafka records to JSON, using the json-flat format. This format maps each top-level JSON field to its own column.
    3 Tell Hazelcast to read from the beginning of the topic so that you can read the values that you already added to it.
    4 The address of the Kafka broker that Hazelcast connects to.
  3. Make sure that the mapping is correct by running a streaming query on the topic.

    SELECT * FROM orders;
    +------------+-------------------------+-------------------------+--------------------+
    |          id|order_ts                 |             total_amount|customer_name       |
    +------------+-------------------------+-------------------------+--------------------+
    |           1|2022-03-29T06:01:18Z     |                133548.84|Amal                |
    |           2|2022-03-29T17:02:20Z     |                164839.31|Alex                |
    |           3|2022-03-29T13:44:10Z     |                 90427.66|Hao                 |
    |           4|2022-03-29T11:58:25Z     |                 33462.11|Cruz                |
  4. Press kbd:[Ctrl+C] to exit the streaming query.

  5. Create a mapping to the shipments topic.

    CREATE OR REPLACE MAPPING shipments(
      id VARCHAR,
      ship_ts TIMESTAMP WITH TIME ZONE,
      order_id INT,
      warehouse VARCHAR
    )
    TYPE Kafka
    OPTIONS (
      'keyFormat' = 'varchar',
      'valueFormat' = 'json-flat',
      'auto.offset.reset' = 'earliest',
      'bootstrap.servers' = 'broker:9092');
  6. Make sure that the mapping is correct by running a streaming query on the topic.

    SELECT * FROM shipments;
    +--------------------+-------------------------+------------+--------------------+
    |id                  |ship_ts                  |    order_id|warehouse           |
    +--------------------+-------------------------+------------+--------------------+
    |ship-ch83360        |2022-03-31T18:13:39Z     |           1|UPS                 |
    |ship-xf72808        |2022-03-31T02:04:13Z     |           2|UPS                 |
    |ship-kr47454        |2022-03-31T20:47:09Z     |           3|DHL                 |
  7. Press kbd:[Ctrl+C] to exit the streaming query.

Step 5. Join the Two Streams

In this step, you’ll join the two streams to get insights about shipments that are sent within 7 days of the order.

You can join streams in Hazelcast only on a table that defines a allowed lag for late events. Hazelcast drops events that are later than the defined lag and does not include them in the result set.

  1. Drop late events when they are one minute or later behind the current latest event.

    CREATE OR REPLACE VIEW shipments_ordered AS
      SELECT * FROM TABLE(IMPOSE_ORDER(
      TABLE shipments,
      DESCRIPTOR(ship_ts), (1)
      INTERVAL '1' MINUTE)); (2)
    CREATE OR REPLACE VIEW orders_ordered AS
      SELECT * FROM TABLE(IMPOSE_ORDER(
      TABLE orders,
      DESCRIPTOR(order_ts), (1)
      INTERVAL '1' MINUTE)); (2)
    1 The field that Hazelcast reads to compare to the lag. This field must be a timestamp.
    2 An allowed lag of one minute.
  2. Join the two streams. This query finds orders that were shipped within 7 days of being placed.

    SELECT o.id AS order_id,
      o.order_ts,
      o.total_amount,
      o.customer_name,
      s.id AS shipment_id,
      s.ship_ts,
      s.warehouse
    FROM orders_ordered o JOIN shipments_ordered s (1)
    ON o.id = s.order_id AND s.ship_ts BETWEEN o.order_ts AND o.order_ts + INTERVAL '7' DAYS; (2)
    1 The inner join makes sure that results are output only for orders that have successfully shipped. The query must find a match on both sides of the join.
    2 A window duration of seven days ignores orders whose shipments don’t occur within 7 days of purchasing. Another added benefit of limiting this query to 7 days of data is that it limits the amount of memory that the query requires.
+------------+-------------------------+-------------------------+--------------------+--------------------+-------------------------+--------------------+
|    order_id|order_ts                 |             total_amount|customer_name       |shipment_id         |ship_ts                  |warehouse           |
+------------+-------------------------+-------------------------+--------------------+--------------------+-------------------------+--------------------+
|           1|2022-03-29T06:01:18Z     |                133548.84|Amal                |ship-ch83360        |2022-03-31T18:13:39Z     |UPS                 |
|           2|2022-03-29T17:02:20Z     |                164839.31|Alex                |ship-xf72808        |2022-03-31T02:04:13Z     |UPS                 |
|           3|2022-03-29T13:44:10Z     |                 90427.66|Hao                 |ship-kr47454        |2022-03-31T20:47:09Z     |DHL                 |

Step 6. Create a Materialized View

In this step, you’ll define a job to run this streaming query in the background and store the results in a materialized view, using a Hazelcast map.

  1. Create a mapping to a Hazelcast map called orders_shipped_within_7_days.

    CREATE OR REPLACE MAPPING orders_shipped_within_7_days(
      __key INT, (1)
      order_ts TIMESTAMP WITH TIME ZONE, (2)
      total_amount DOUBLE,
      customer_name VARCHAR,
      shipment_id VARCHAR,
      ship_ts TIMESTAMP WITH TIME ZONE,
      warehouse VARCHAR
    )
    TYPE IMAP
      OPTIONS (
        'keyFormat' = 'int', (1)
        'valueFormat' = 'json-flat'); (2)
    1 The first column must be named __key. This column is mapped to the key of map entries.
    2 The other columns must appear in the same order as the streaming query results so that the data types are mapped correctly.
  2. Create the job.

    CREATE JOB get_orders_shipped_within_7_days AS
      SINK INTO orders_shipped_within_7_days (1)
      SELECT o.id AS __key, (2)
        o.order_ts,
        o.total_amount,
        o.customer_name,
        s.id AS shipment_id,
        s.ship_ts,
        s.warehouse
      FROM orders_ordered o JOIN shipments_ordered s (1)
      ON o.id = s.order_id AND s.ship_ts BETWEEN o.order_ts AND o.order_ts + INTERVAL '7' DAYS;
    1 Insert the results into the `orders_shipped_within_7_days ` map.
    2 Make sure that the selected fields are in the same order as the mapping to the `orders_shipped_within_7_days ` map.
  3. Query the map to make sure that the job is working.

    SELECT * FROM orders_shipped_within_7_days;

You should see the following:

+------------+-------------------------+-------------------------+--------------------+--------------------+-------------------------+--------------------+
|       __key|order_ts                 |             total_amount|customer_name       |shipment_id         |ship_ts                  |warehouse           |
+------------+-------------------------+-------------------------+--------------------+--------------------+-------------------------+--------------------+
|           2|2022-03-29T17:02:20Z     |                164839.31|Alex                |ship-xf72808        |2022-03-31T02:04:13Z     |UPS                 |
|           1|2022-03-29T06:01:18Z     |                133548.84|Amal                |ship-ch83360        |2022-03-31T18:13:39Z     |UPS                 |
|           3|2022-03-29T13:44:10Z     |                 90427.66|Hao                 |ship-kr47454        |2022-03-31T20:47:09Z     |DHL                 |
+------------+-------------------------+-------------------------+--------------------+--------------------+-------------------------+--------------------+

If you left this query running, it would continue to add new results for orders shipped within 7 days. You can connect your applications to the Hazelcast cluster and query this map to get further insights.

Step 7. Clean Up

Stop and remove your Docker containers.

docker compose stop
docker compose rm

Summary

In this tutorial, you learned:

  • How to get deeper insights from two related streams by joining them together.

  • How to run the streaming query in the background and store the results in a materialized view, using a job.

Next Steps

Persist mappings and maps

By default, mappings and maps are not persisted. When you stop your cluster, all mappings and map data are deleted. To persist this data, you can enable the Persistence feature in the cluster configuration. Or, you can use Hazelcast {viridian}, which is persists this data by default. For an introduction to querying Kafka streams in Hazelcast {viridian}, see Query Streams from Confluent Cloud.

Manage memory

The materialized view would continue to store more and more results as new orders and shipment events are generated. To control the size of the map and the amount of memory it consumes, you can configure it with limits. See Managing Map Memory.

Manage jobs

To manage your streaming job, see Managing Jobs.

Explore Management Center

To manage and monitor your cluster, you can use Management Center. This project runs Management Center at http://locahost:8080. See the Management Center documentation for details.