Query Streams from Confluent Cloud
In this tutorial, you’ll learn how to connect your cluster to Confluent Cloud, using SQL. You’ll also learn how to build a materialized view from streaming data and contextual data in Hazelcast.
Context
Streaming data is data that is continuously generated in small sizes. Streaming data includes a wide variety of sources such as retail purchases, financial trades, or telemetry from connected vehicles. This data isn’t usually useful by itself because of its size. It must be enriched with contextual data that is often stored in a database.
Hazelcast allows you to connect to streaming data sources such as Confluent Cloud Kafka clusters as well as cache contextual data in your Hazelcast cluster. With access to both of these data sources, you can process your data in one place and store the results in a materialized view.
Materialized views are useful for speeding up queries that are repeated. Instead of performing resource-intensive queries against large datasets in different sources, your applications can query a materialized view and retrieve a precomputed result.
Before you Begin
You need the following:
-
A Hazelcast Cloud cluster. This tutorial assumes that you have a production cluster for Cloud Standard.
-
A basic understanding of Kafka.
Step 1. Set Up the Hazelcast CLC
You need to use the Hazelcast CLC in later steps to open connections to the SQL shell of your Cloud cluster.
-
Install the Hazelcast CLC client for your operating system.
-
Run the following command to check that the Hazelcast CLC client is installed.
clc version
You should see your installed version of the Hazelcast CLC client.
-
Navigate to the Cluster Details page on the Viridian UI which shows the Client Connect options. Choose
CLI
and run theclc config import
command to import the configuration of your Cloud cluster. -
Now execute the following command to connect the Hazelcast CLC to your cluster. Replace the placeholder
$CLUSTER_NAME
with your cluster name. You can find the cluster name on the dashboard of your cluster under Cluster Details.clc -c $CLUSTER_NAME
A CLC prompt is displayed:
$CLUSTER_NAME...SQL>
The Hazelcast CLC client is successfully connected to your cluster ready for later steps in this tutorial.
Step 2. Create a Free Confluent Cloud Kafka Cluster
-
Create a Confluent Cloud account if you don’t have one already.
-
Create a Basic cluster.
-
Select the same cloud provider as your Cloud cluster and the region that’s closest to your Hazelcast cluster. For example, AWS Oregon (us-west-2).
-
Skip payment and launch the cluster.
-
Click your cluster’s name in the breadcrumbs at the top of the page.
-
Click Topics > Create Topic and enter trades in the Topic name field.
-
Click Create with defaults. Confluent Cloud won’t create a topic if you try to insert data into a topic that doesn’t exist.
-
Click Clients and select Java. Hazelcast uses a Java client to connect to Kafka clusters, so you need the configuration for a Java client.
-
Click Create Kafka cluster API key and enter the name of your Cloud cluster in the description. This name helps you remember that your cluster is using that API key the next time you view them in Confluent Cloud.
-
Click Download and continue. The configuration snippet now includes your API key and secret.
-
Copy the code in your configuration snippet from the top to
session.timeout.ms=45000
. You won’t use the Schema Registry in this tutorial.
Step 3. Create a Mapping to the Confluent Cloud Cluster
To allow Hazelcast to access the trades
topic that you created in your Confluent Cloud Kafka cluster, you need to create a mapping to it.
-
Sign into the Cloud console and select your cluster.
-
Go to SQL in the left navigation to open the SQL browser.
-
Create the mapping. Paste the connection configurations that you copied from Confluent Cloud below the
valueFormat
option. Make sure to format the configuration as necessary. For example:-- Create a mapping to a Kafka topic called 'trades'. CREATE OR REPLACE MAPPING trades ( id BIGINT, ticker VARCHAR, price_usd DECIMAL, amount BIGINT) TYPE Kafka OPTIONS ( -- Serialization format 'valueFormat' = 'json-flat', -- Required connection configs for Kafka producer, consumer, and admin 'bootstrap.servers'='<YOUR BOOTSTRAP SERVER>', 'security.protocol'='SASL_SSL', 'sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="<YOUR API KEY>" password="<YOUR API SECRET>";', 'sasl.mechanism'='PLAIN', --Required for correctness in Apache Kafka clients prior to 2.6 'client.dns.lookup'='use_all_dns_ips', -- Best practice for higher availability in Apache Kafka clients prior to 3.0 'session.timeout.ms'='45000' );
The sasl.jaas.config
entry must be terminated by a semi-colon.The
trades
topic accepts trades in JSON format, using the following schema:{ "id": , "ticker": , "price_usd": , "amount": , }
-
Publish some new trades to the topic.
INSERT INTO trades VALUES (1, 'SORG', 5.5, 10), (2, 'EORG', 14, 20);
-
If you haven’t started the CLC prompt on your Cloud cluster, do it now:
clc -c $CLUSTER_NAME
-
In the CLC prompt, write a streaming query that filters trade messages, where the total trade order is more than $100.
SELECT ticker, price_usd, amount FROM trades WHERE price_usd * amount > 100;
Result
The result is an empty table. You don’t see any results because, by default, Confluent Cloud consumers read messages, starting from the latest offset. The trades that you published already happened, and so they are not included.
+------------+----------------------+-------------------+ |ticker | price_usd | amount | +------------+----------------------+-------------------+
-
Stop the streaming query by pressing Ctrl+C to close the connection to the CLC prompt.
To exit the CLC shell, press Ctrl+D. -
Back in the SQL browser, create the mapping to the topic again, but this time add the
'auto.offset.reset'='earliest'
configuration. This configuration tells the Kafka consumer to read all data in the topic from the beginning, not just from the latest offset.-- Create a mapping to a Kafka topic called 'trades'. CREATE OR REPLACE MAPPING trades ( id BIGINT, ticker VARCHAR, price_usd DECIMAL, amount BIGINT) TYPE Kafka OPTIONS ( -- Serialization format 'valueFormat' = 'json-flat', -- Required connection configs for Kafka producer, consumer, and admin 'bootstrap.servers'='<YOUR BOOTSTRAP SERVER>', 'security.protocol'='SASL_SSL', 'sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="<YOUR API KEY>" password="<YOUR API SECRET>";', 'sasl.mechanism'='PLAIN', --Required for correctness in Apache Kafka clients prior to 2.6 'client.dns.lookup'='use_all_dns_ips', -- Best practice for higher availability in Apache Kafka clients prior to 3.0 'session.timeout.ms'='45000', 'auto.offset.reset'='earliest' );
You can find your previous mapping query in the History tab of the SQL browser. -
In the CLC prompt, enter the same streaming query that gave no results the last time you ran it.
SELECT ticker, price_usd, amount FROM trades WHERE price_usd * amount > 100;
Result
Hazelcast executes the query and filters the results, using your previous trades:
+-----------------+----------------------+-------------------+ |ticker | price_usd | amount | +-----------------+----------------------+-------------------+ |EORG | 14 | 20 |
-
Stop the streaming query by pressing Ctrl+C to close the connection to the CLC prompt.
To exit the CLC shell, press Ctrl+D.
Step 4. Enrich the Data in the Kafka Messages
To reduce network latency, Kafka messages are often small and contain minimal data. For example, the trades
topic does not contain any information about the company that’s associated with a given ticker. To get deeper insights from data in Kafka topics, you can join query results with contextual data.
-
Open the SQL browser.
-
Create a mapping to a new map called
companies
in Hazelcast. The new map is for storing the company information that you’ll use to enrich results from thetrades
topic.CREATE MAPPING companies ( __key BIGINT, ticker VARCHAR, company VARCHAR, marketcap BIGINT) TYPE IMap OPTIONS ( 'keyFormat'='bigint', 'valueFormat'='json-flat');
-
Add some entries to the
companies
map.INSERT INTO companies VALUES (1, 'SORG', 'Example Startup Organization', 100000), (2, 'EORG', 'Example Enterprise Organization', 5000000);
-
Merge results from the
companies
map andtrades
topic so you can see the company name that’s associated with each ticker.SELECT trades.ticker, companies.company, trades.amount FROM trades JOIN companies ON companies.ticker = trades.ticker;
Result
Hazelcast is executing the streaming query.
+------------+-------------------------------+--------------+ |ticker |company |amount | +------------+-----------+-------------------+--------------| |SORG |Example Startup Organization |10 | |EORG |Example Enterprise Organization|20 |
-
Click Stop Query.
Step 5. Create a Materialized View
You can set up an automated job to continuously run the streaming query and cache the results in a Hazelcast map.
-
Open the SQL browser.
-
Create a mapping to a new map called
trade_map
. This map is your materialized view, which caches the enriched results of the streaming query.CREATE MAPPING trade_map ( __key BIGINT, ticker VARCHAR, company VARCHAR, amount BIGINT) TYPE IMap OPTIONS ( 'keyFormat'='bigint', 'valueFormat'='json-flat');
-
Submit a job to your cluster that will monitor your
trade
topic for changes and store them in a map. The processing guarantee tells Hazelcast to save the current offsets so that the cluster can resume the job even if the cluster restarts.CREATE JOB ingest_trades OPTIONS ( 'processingGuarantee' = 'exactlyOnce' ) AS SINK INTO trade_map SELECT trades.id, trades.ticker, companies.company, trades.amount FROM trades JOIN companies ON companies.ticker = trades.ticker;
A job will run indefinitely until it is explicitly canceled or the cluster is shut down. Even if you exit the command prompt, the job will continue running on the cluster.
-
List your job to make sure that it was successfully submitted.
SHOW JOBS;
Result
A job called
ingest_trades
is running.+--------------------+ |name | +--------------------+ |ingest_trades | +--------------------+
-
Query your materialized view to see that results have been added to it.
SELECT * FROM trade_map;
Result
The query results are being stored in your map.
+---------+---------+---------------------------------+------------+ | id|ticker | company | amount| +---------+---------+---------------------------------+------------+ | 2|EORG |Example Enterprise Organization | 20| | 1|SORG |Example Startup Organization | 10| +---------+---------+----------+----------------------+------------+
-
Publish some more trades to the topic.
INSERT INTO trades VALUES (3, 'SORG', 5.7, 23), (4, 'EORG', 12, 54);
Your materialized view will continue to be updated for each new trade that’s added to the topic in the Kafka cluster.
-
Query your materialized view to see that results have been added to it.
SELECT * FROM trade_map;
Step 6. Clean Up
Your running job is consuming resources in your cluster. When you don’t need a job anymore, it’s important to cancel it.
-
To cancel your job, use the
DROP
statement to cancel it.DROP JOB ingest_trades;
-
Check that the job is no longer running.
SHOW JOBS;
The table is empty, which means your job is no longer running.
Summary
You’ve learned how to connect Cloud to a Confluent Cloud Kafka cluster as well as the following:
-
How to query streaming data from a Kafka topic.
-
How to enrich streaming data with contextual data and save the results to a materialized view.
Related Resources
See the docs:
Learn more about the concept of stream processing.