This is a prerelease version.

Get Started with SQL Pipelines

Use an interactive SQL shell on your Hazelcast member to query and analyze data from multiple sources and store the results.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

A Hazelcast cluster and an instance of Management Center running on your local network

Step 1. Query Streaming Data from Apache Kafka

Hazelcast can ingest and query real-time streaming data as it is being generated. This feature is ideal for use cases such as fraud detection, which relies on complex queries with low latency.

In this step, you use SQL to get data from a Kafka topic and display the results of a query.

The data that you will push to Kafka will be JSON messages with trading information such as the following:

{
    "id": 1,
    "ticker": "ABCD",
    "price": 5.5,
    "amount": 10
}
  1. Connect to the SQL shell on your cluster member.

    • Binary

    • Docker

    Mac and Linux
    bin/hz-cli sql
    Windows
    bin/hz-cli.bat sql

    Replace the $LOCAL_IP placeholder with your member’s local IP address.

    docker run --network hazelcast-network -it --rm hazelcast/hazelcast:5.0-BETA-1 hazelcast --targets hello-world@$LOCAL_IP sql

    The --targets parameter tells the SQL shell to connect to the member at the given IP address in a cluster called hello-world.

    Make sure you mount the likes.csv file on the container that’s running your member.
  2. On the same device as your Hazelcast member, start a Kafka server.

    • Binary

    • Docker

    1. Download Kafka.

      wget http://mirror.cc.columbia.edu/pub/software/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
      tar xvf kafka_2.13-2.7.0.tgz
      cd kafka_2.13-2.7.0
    2. Start Zookeeper.

      bin/zookeeper-server-start.sh config/zookeeper.properties
    3. In another terminal, start Kafka.

      bin/kafka-server-start.sh config/server.properties
    docker run --name kafka --network hazelcast-network --rm hazelcast/hazelcast-quickstart-kafka
  3. In the SQL shell, use the CREATE MAPPING statement to allow Hazelcast to access data that is pushed to the Kafka server.

    • Binary

    • Docker

    CREATE MAPPING trades (
        id BIGINT,
        ticker VARCHAR,
        price DECIMAL,
        amount BIGINT)
    TYPE Kafka
    OPTIONS (
        'valueFormat' = 'json',
        'bootstrap.servers' = '127.0.0.1:9092'
    );
    CREATE MAPPING trades (
        id BIGINT,
        ticker VARCHAR,
        price DECIMAL,
        amount BIGINT)
    TYPE Kafka
    OPTIONS (
        'valueFormat' = 'json',
        'bootstrap.servers' = 'kafka:9092'
    );
  4. Write a streaming query that filters trade events from Kafka and adds them to a table.

    SELECT ticker, ROUND(price * 100) AS price_cents, amount
      FROM trades
      WHERE price * amount > 100;

    You should see an empty table:

    +------------+----------------------+-------------------+
    |ticker      |           price_cents|             amount|
    +------------+----------------------+-------------------+
    Streaming queries like this one continue to run until you close the shell or kill the process with Ctrl + C.
  5. In another terminal, open another connection to the SQL shell and push some messages to Kafka.

    INSERT INTO trades VALUES
      (1, 'ABCD', 5.5, 10),
      (2, 'EFGH', 14, 20);
  6. Go back to the terminal where you created the streaming query.

    You should see that Hazelcast has executed the query and filtered the results:

    +-----------------+----------------------+-------------------+
    |ticker           |           price_cents|             amount|
    +-----------------+----------------------+-------------------+
    |EFGH             |                  1400|                 20|

Step 2. Store Pipeline Results in Hazelcast

As well as querying and transforming data, you can store your query results in one or more systems. This feature is useful for sending results to other systems or caching results in Hazelcast to avoid running redundant queries.

In this step, you create a mapping to a map named tradeMap and use JSON to store some trade data in it.

  1. Use the CREATE MAPPING statement to create a map.

    CREATE MAPPING tradeMap (
        __key BIGINT,
        ticker VARCHAR,
        price DECIMAL,
        amount BIGINT)
    TYPE IMap
    OPTIONS (
        'keyFormat'='bigint',
        'valueFormat'='json');
  2. Use the SINK INTO statement to store the results of a query in the map.

    SINK INTO tradeMap VALUES (1, 'hazl', 10, 1);
  3. Query the map to make sure that the data is there.

    SELECT * FROM tradeMap;

You should see the following:

+-----+----------+--------+--------+
|__key|ticker    |   price|  amount|
+-----+----------+--------+--------+
|    1|hazl      |10.0000…|       1|
+-----+----------+--------+--------+

Step 3. Create a Standalone Streaming Query

In this step, you use SQL to get data from a Kafka topic and store the results of the query in a map.

  1. Use the CREATE JOB statement to submit a streaming job to your cluster that will monitor you Kafka topic for changes and store them in a map.

    CREATE JOB ingest_trades AS
      SINK INTO tradeMap
      SELECT id, ticker, price, amount
      FROM trades;
    A streaming job will run indefinitely until it is explicitly canceled or the cluster is shut down. Even if you kill the shell connection, the job will continue running on the cluster. To learn more about jobs, see About Data Pipelines.
  2. Use the SHOW JOBS statement to make sure that your job was successfully submitted.

    SHOW JOBS;

    You should see a job called ingest_trades.

    +--------------------+
    |name                |
    +--------------------+
    |ingest_trades       |
    +--------------------+
  3. Publish some events to the Kafka topic.

    INSERT INTO trades VALUES
      (1, 'ABCD', 5.5, 10),
      (2, 'EFGH', 14, 20);
  4. Query your tradeMap map to see that the Kafka data has been added to it.

    SELECT * FROM tradeMap;

    You should see that the data coming from Kafka is being stored in your map.

    +---------------+--------------------+----------+--------------------+
    |             id|ticker              |     price|              amount|
    +---------------+--------------------+----------+--------------------+
    |              2|EFGH                |14.000000…|                  20|
    |              1|ABCD                |5.5000000…|                  10|
    +---------------+--------------------+----------+--------------------+
  5. To stop your streaming job, use the DROP statement to cancel it.

    DROP JOB ingest_trades;

    In the console of the Hazelcast member, you should see that the job is canceled as well as the time it was started and how long it ran for.

    Execution of job '062d-d578-9240-0001', execution 062d-d578-df80-0001 got terminated, reason=java.util.concurrent.CancellationException
    	Start time: 2021-05-13T16:31:14.410
    	Duration: 00:02:48.318

Next Steps

See a complete list of supported SQL statements.

For more details about integrating Hazelcast with files, see File Connector.

For more details about integrating Hazelcast with Kafka, see Kafka Connector.