Get Started with SQL Over Kafka

In this tutorial, you use an interactive SQL shell on a Hazelcast member to query Kafka topics in real-time.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

A Hazelcast cluster in client/server mode and an instance of Management Center running on your local network

A connection to the SQL shell

Step 1. Set Up a Kafka Broker

  1. On the same device as your Hazelcast member, start a Kafka broker.

    • Binary

    • Docker

    1. Download Kafka.

      wget http://mirror.cc.columbia.edu/pub/software/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
      tar xvf kafka_2.13-2.7.0.tgz
      cd kafka_2.13-2.7.0
    2. Start Zookeeper.

      bin/zookeeper-server-start.sh config/zookeeper.properties
    3. In another terminal, start Kafka.

      bin/kafka-server-start.sh config/server.properties
    docker run --name kafka --network hazelcast-network --rm hazelcast/hazelcast-quickstart-kafka

Step 2. Create a Mapping to Kafka

In the SQL shell, create a Kafka mapping to allow Hazelcast to access messages in the trades topic.

  • Binary

  • Docker

CREATE MAPPING trades (
    id BIGINT,
    ticker VARCHAR,
    price DECIMAL,
    amount BIGINT)
TYPE Kafka
OPTIONS (
    'valueFormat' = 'json-flat',
    'bootstrap.servers' = '127.0.0.1:9092'
);
CREATE MAPPING trades (
    id BIGINT,
    ticker VARCHAR,
    price DECIMAL,
    amount BIGINT)
TYPE Kafka
OPTIONS (
    'valueFormat' = 'json-flat',
    'bootstrap.servers' = 'kafka:9092'
);

Here, you configure the connector to read JSON values with the following fields:

{
  "id"
  "ticker"
  "price"
  "amount"
}

Step 3. Run a Streaming Query on the Kafka Topic

  1. Write a streaming query that filters trade messages from Kafka.

    SELECT ticker, ROUND(price * 100) AS price_cents, amount
      FROM trades
      WHERE price * amount > 100;

    You should see an empty table:

    +------------+----------------------+-------------------+
    |ticker      |           price_cents|             amount|
    +------------+----------------------+-------------------+
    Streaming queries like this one continue to run until you close the shell or kill the process with Ctrl + C.
  2. In another terminal, open another connection to the SQL shell and publish some messages to the trades topic.

    INSERT INTO trades VALUES
      (1, 'ABCD', 5.5, 10),
      (2, 'EFGH', 14, 20);
  3. Go back to the terminal where you created the streaming query.

    You should see that Hazelcast has executed the query and filtered the results:

    +-----------------+----------------------+-------------------+
    |ticker           |           price_cents|             amount|
    +-----------------+----------------------+-------------------+
    |EFGH             |                  1400|                 20|

Step 4. Enrich the Data in the Kafka Messages

Kafka messages are often small and contain minimal data to reduce network latency. For example, the trades topic does not contain any information about the company that’s associated with a given ticker. To get deeper insights from data in Kafka topics, you can join query results with data in other mappings.

  1. Create a mapping to a new map in which to store the company information that you’ll use to enrich results from the trades topic.

    CREATE MAPPING companies (
    __key BIGINT,
    ticker VARCHAR,
    company VARCHAR,
    marketcap BIGINT)
    TYPE IMap
    OPTIONS (
    'keyFormat'='bigint',
    'valueFormat'='json-flat');
  2. Add some entries to the companies map.

    INSERT INTO companies VALUES
    (1, 'ABCD', 'The ABCD', 100000),
    (2, 'EFGH', 'The EFGH', 5000000);
  3. Use the JOIN clause to merge results from the companies map and trades topic so you can see which companies are being traded.

    SELECT trades.ticker, companies.company, trades.amount
    FROM trades
    JOIN companies
    ON companies.ticker = trades.ticker;
    +------------+-----------+----------+
    |ticker      |company    |    amount|
    +------------+-----------+----------+
  4. In another SQL shell, publish some messages to the trades topic.

    INSERT INTO trades VALUES
      (1, 'ABCD', 5.5, 10),
      (2, 'EFGH', 14, 20);
  5. Go back to the terminal where you created the streaming query that merges results from the companies map and trades topic.

    You should see that Hazelcast has executed the query.

    +------------+-----------+----------+
    |ticker      |company    |    amount|
    +------------+-----------+----------+
    |ABCD        |The ABCD   |10        |
    |EFGH        |The EFGH   |20        |

Step 5. Ingest Query Results into a Hazelcast Map

To save your query results as a view that you can later access faster, you can cache them in Hazelcast by ingesting them into a map.

  1. Create a mapping to a new map in which to ingest your streaming query results.

    CREATE MAPPING trade_map (
    __key BIGINT,
    ticker VARCHAR,
    company VARCHAR,
    amount BIGINT)
    TYPE IMap
    OPTIONS (
    'keyFormat'='bigint',
    'valueFormat'='json-flat');
  2. Submit a streaming job to your cluster that will monitor your trade topic for changes and store them in a map.

    CREATE JOB ingest_trades AS
    SINK INTO trade_map
    SELECT trades.id, trades.ticker, companies.company, trades.amount
    FROM trades
    JOIN companies
    ON companies.ticker = trades.ticker;
    A streaming job will run indefinitely until it is explicitly canceled or the cluster is shut down. Even if you kill the shell connection, the job will continue running on the cluster.
  3. List your job to make sure that it was successfully submitted.

    SHOW JOBS;

    You should see a job called ingest_trades.

    +--------------------+
    |name                |
    +--------------------+
    |ingest_trades       |
    +--------------------+
  4. Publish some messages to the Kafka topic.

    INSERT INTO trades VALUES
    (1, 'ABCD', 5.5, 10),
    (2, 'EFGH', 14, 20);
  5. Query your trade_map map to see that the Kafka messages have been added to it.

    SELECT * FROM trade_map;

    You should see that the data coming from the Kafka broker is being stored in your map.

    +---------+---------+----------+------------+
    |       id|ticker   |   company|      amount|
    +---------+---------+----------+------------+
    |        2|EFGH     |The EFGH  |          20|
    |        1|ABCD     |The ABCD  |          10|
    +---------+---------+----------+------------+

Step 6. Cancel the Job

A streaming job will run indefinitely until it is explicitly canceled or the cluster is shut down. Even if you kill the shell connection, the job will continue running on the cluster.

  1. To stop your streaming job, use the DROP statement to cancel it.

    DROP JOB ingest_trades;

In the terminal where you started the Hazelcast member, you should see that the job is canceled as well as the time it was started and how long it ran for.

Start time: 2021-05-13T16:31:14.410
Duration: 00:02:48.318

Next Steps

To learn more about SQL, see SQL Overview

To learn how to work with jobs, see the following: