Query Streams from Confluent Cloud

In this tutorial, you’ll learn how to connect your cluster to Confluent Cloud, using SQL. You’ll also learn how to build a materialized view from streaming data and contextual data in Hazelcast.

Context

Streaming data is data that is continuously generated in small sizes. Streaming data includes a wide variety of sources such as retail purchases, financial trades, or telemetry from connected vehicles. This data isn’t usually useful by itself because of its size. It must be enriched with contextual data that is often stored in a database.

Hazelcast allows you to connect to streaming data sources such as Confluent Cloud Kafka clusters as well as cache contextual data in your Hazelcast cluster. With access to both of these data sources, you can process your data in one place and store the results in a materialized view.

Materialized views are useful for speeding up queries that are repeated. Instead of performing resource-intensive queries against large datasets in different sources, your applications can query a materialized view and retrieve a precomputed result.

Before you Begin

You need the following:

  • A Hazelcast Viridian cluster. This tutorial assumes that you have a production cluster for Viridian Serverless.

  • Hazelcast CLI for opening connections to the SQL shell of your Viridian cluster. You can follow the instructions for SQL in Hazelcast Viridian Hello World until you see the SQL prompt.

  • A basic understanding of Kafka.

Step 1. Create a Free Confluent Cloud Kafka Cluster

  1. Create a Confluent Cloud account if you don’t have one already.

  2. Create a Basic cluster.

  3. Select the same cloud provider as your Viridian cluster and the region that’s closest to your Hazelcast cluster. For example, AWS Oregon (us-west-2).

  4. Skip payment and launch the cluster.

  5. Click your cluster’s name in the breadcrumbs at the top of the page.

    Breadcrumbs that display the cluster name

  6. Click Topics > Create Topic and enter trades in the Topic name field. Confluent Cloud won’t create a topic if you try to insert data into a topic that doesn’t exist.

  7. Click Data integration and select Java. Hazelcast uses a Java client to connect to Kafka clusters, so you need the configuration for a Java client.

  8. Click Create Kafka cluster API key and enter the name of your Viridian cluster in the description. This name helps you remember that your cluster is using that API key the next time you view them in Confluent Cloud.

  9. Click Download and continue. The configuration snippet now includes your API key and secret.

  10. Copy the code in your configuration snippet from the top to session.timeout.ms=45000. You won’t use the Schema Registry in this tutorial.

Step 2. Create a Mapping to the Confluent Cloud Cluster

To allow Hazelcast to access the trades topic that you created in your Confluent Cloud Kafka cluster, you need to create a mapping to it.

  1. Sign into the Hazelcast Viridian console and select your cluster.

  2. Go to Management Center > SQL Browser.

  3. Create the mapping. Paste the connection configurations that you copied from Confluent Cloud below the valueFormat option. Make sure to format the configuration as necessary. For example:

    -- Create a mapping to a Kafka topic called 'trades'.
    CREATE OR REPLACE MAPPING trades (
      id BIGINT,
      ticker VARCHAR,
      price_usd DECIMAL,
      amount BIGINT)
    TYPE Kafka
    OPTIONS (
      -- Serialization format
      'valueFormat' = 'json-flat',
      -- Required connection configs for Kafka producer, consumer, and admin
      'bootstrap.servers'='<YOUR BOOTSTRAP SERVER>',
      'security.protocol'='SASL_SSL',
      'sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule
      required username="<YOUR API KEY>"
      password="<YOUR API SECRET>";',
      'sasl.mechanism'='PLAIN',
      --Required for correctness in Apache Kafka clients prior to 2.6
      'client.dns.lookup'='use_all_dns_ips',
      -- Best practice for higher availability in Apache Kafka clients prior to 3.0
      'session.timeout.ms'='45000'
    );

    The trades topic accepts trades in JSON format, using the following schema:

    {
      "id": ,
      "ticker": ,
      "price_usd": ,
      "amount": ,
    }
  4. Publish some new trades to the topic.

    INSERT INTO trades VALUES
      (1, 'SORG', 5.5, 10),
      (2, 'EORG', 14, 20);
  5. If you haven’t started the SQL prompt on your Viridian cluster, do it now:

    hz-cli -f hazelcast-client-with-ssl.yml sql
  6. In the SQL prompt, write a streaming query that filters trade messages, where the total trade order is more than $100.

    SELECT ticker, price_usd, amount
      FROM trades
      WHERE price_usd * amount > 100;
    Result

    The result is an empty table. You don’t see any results because, by default, Confluent Cloud consumers read messages, starting from the latest offset. The trades that you published already happened, and so they are not included.

    +------------+----------------------+-------------------+
    |ticker      |           price_usd  |          amount   |
    +------------+----------------------+-------------------+
  7. Stop the streaming query by pressing kbd:[Ctrl + C] to close the connection to the SQL prompt.

  8. Back in the SQL browser in Management Center, create the mapping to the topic again, but this time add the 'auto.offset.reset'='earliest' configuration. This configuration tells the Kafka consumer to read all data in the topic from the beginning, not just from the latest offset.

    -- Create a mapping to a Kafka topic called 'trades'.
    CREATE OR REPLACE MAPPING trades (
      id BIGINT,
      ticker VARCHAR,
      price_usd DECIMAL,
      amount BIGINT)
    TYPE Kafka
    OPTIONS (
      -- Serialization format
      'valueFormat' = 'json-flat',
      -- Required connection configs for Kafka producer, consumer, and admin
      'bootstrap.servers'='<YOUR BOOTSTRAP SERVER>',
      'security.protocol'='SASL_SSL',
      'sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule
      required username="<YOUR API KEY>"
      password="<YOUR API SECRET>";',
      'sasl.mechanism'='PLAIN',
      --Required for correctness in Apache Kafka clients prior to 2.6
      'client.dns.lookup'='use_all_dns_ips',
      -- Best practice for higher availability in Apache Kafka clients prior to 3.0
      'session.timeout.ms'='45000',
      'auto.offset.reset'='earliest'
    );
    You can find your previous mapping query in the History tab of the SQL browser.
  9. In the SQL prompt, enter the same streaming query that gave no results the last time you ran it.

    SELECT ticker, price_usd, amount
      FROM trades
      WHERE price_usd * amount > 100;
    Result

    Hazelcast executes the query and filters the results, using your previous trades:

    +-----------------+----------------------+-------------------+
    |ticker           |       price_usd      |       amount      |
    +-----------------+----------------------+-------------------+
    |EORG             |                  14  |               20  |
  10. Stop the streaming query by pressing kbd:[Ctrl + C] to close the connection to the SQL prompt.

Step 3. Enrich the Data in the Kafka Messages

To reduce network latency, Kafka messages are often small and contain minimal data. For example, the trades topic does not contain any information about the company that’s associated with a given ticker. To get deeper insights from data in Kafka topics, you can join query results with contextual data.

  1. Open the SQL browser in Management Center.

  2. Create a mapping to a new map called companies in Hazelcast. The new map is for storing the company information that you’ll use to enrich results from the trades topic.

    CREATE MAPPING companies (
    __key BIGINT,
    ticker VARCHAR,
    company VARCHAR,
    marketcap BIGINT)
    TYPE IMap
    OPTIONS (
    'keyFormat'='bigint',
    'valueFormat'='json-flat');
  3. Add some entries to the companies map.

    INSERT INTO companies VALUES
    (1, 'SORG', 'Example Startup Organization', 100000),
    (2, 'EORG', 'Example Enterprise Organization', 5000000);
  4. Merge results from the companies map and trades topic so you can see the company name that’s associated with each ticker.

    SELECT trades.ticker, companies.company, trades.amount
    FROM trades
    JOIN companies
    ON companies.ticker = trades.ticker;
    Result

    Hazelcast is executing the streaming query.

    +------------+-------------------------------+--------------+
    |ticker      |company                        |amount        |
    +------------+-----------+-------------------+--------------|
    |SORG        |Example Startup Organization   |10            |
    |EORG        |Example Enterprise Organization|20            |
  5. Click Stop Query.

Step 4. Create a Materialized View

You can set up an automated job to continuously run the streaming query and cache the results in a Hazelcast map.

  1. Open the SQL browser in Management Center.

  2. Create a mapping to a new map called trade_map. This map is your materialized view, which caches the enriched results of the streaming query.

    CREATE MAPPING trade_map (
    __key BIGINT,
    ticker VARCHAR,
    company VARCHAR,
    amount BIGINT)
    TYPE IMap
    OPTIONS (
    'keyFormat'='bigint',
    'valueFormat'='json-flat');
  3. Submit a job to your cluster that will monitor your trade topic for changes and store them in a map. The processing guarantee tells Hazelcast to save the current offsets so that the cluster can resume the job even if the cluster restarts.

    CREATE JOB ingest_trades
    OPTIONS (
      'processingGuarantee' = 'exactlyOnce',
    ) AS
    SINK INTO trade_map
    SELECT trades.id, trades.ticker, companies.company, trades.amount
    FROM trades
    JOIN companies
    ON companies.ticker = trades.ticker;

    A job will run indefinitely until it is explicitly canceled or the cluster is shut down. Even if you exit the command prompt, the job will continue running on the cluster.

  4. List your job to make sure that it was successfully submitted.

    SHOW JOBS;
    Result

    A job called ingest_trades is running.

    +--------------------+
    |name                |
    +--------------------+
    |ingest_trades       |
    +--------------------+
  5. Query your materialized view to see that results have been added to it.

    SELECT * FROM trade_map;
    Result

    The query results are being stored in your map.

    +---------+---------+---------------------------------+------------+
    |       id|ticker   |   company                       |      amount|
    +---------+---------+---------------------------------+------------+
    |        2|EORG     |Example Enterprise Organization  |          20|
    |        1|SORG     |Example Startup Organization     |          10|
    +---------+---------+----------+----------------------+------------+
  6. Publish some more trades to the topic.

    INSERT INTO trades VALUES
      (3, 'SORG', 5.7, 23),
      (4, 'EORG', 12, 54);

    Your materialized view will continue to be updated for each new trade that’s added to the topic in the Kafka cluster.

  7. Query your materialized view to see that results have been added to it.

    SELECT * FROM trade_map;

Step 5. Clean Up

Your running job is consuming resources in your cluster. When you don’t need a job anymore, it’s important to cancel it.

  1. To cancel your job, use the DROP statement to cancel it.

    DROP JOB ingest_trades;
  2. Check that the job is no longer running.

    SHOW JOBS;

The table is empty table, which means your job is no longer running.

Summary

You’ve learned how to connect Hazelcast Viridian to a Confluent Cloud Kafka cluster as well as the following:

  • How to query streaming data from a Kafka topic.

  • How to enrich streaming data with contextual data and save the results to a materialized view.

See the docs:

Learn more about the concept of stream processing.