Get Started with SQL Over Kafka
In this tutorial, you use an interactive SQL shell on a Hazelcast member to query Kafka topics in real-time.
Before You Begin
To complete this tutorial, you need the following:
Prerequisites | Useful resources |
---|---|
A Hazelcast cluster in client/server mode and an instance of Management Center running on your local network |
|
A connection to the SQL shell |
Step 1. Set Up a Kafka Broker
-
On the same device as your Hazelcast member, start a Kafka broker.
docker run --name kafka --network hazelcast-network --rm hazelcast/hazelcast-quickstart-kafka
-
Download Kafka.
wget https://archive.apache.org/dist/kafka/2.7.0/kafka-2.7.0-src.tgz tar xvf kafka-2.7.0-src.tgz cd kafka-2.7.0-src
-
Start Zookeeper.
bin/zookeeper-server-start.sh config/zookeeper.properties
-
In another terminal, start Kafka.
bin/kafka-server-start.sh config/server.properties
-
Step 2. Create a Mapping to Kafka
In the SQL shell, create a Kafka mapping to allow Hazelcast to access messages in the trades
topic.
CREATE MAPPING trades (
id BIGINT,
ticker VARCHAR,
price DECIMAL,
amount BIGINT)
TYPE Kafka
OPTIONS (
'valueFormat' = 'json-flat',
'bootstrap.servers' = '127.0.0.1:9092'
);
CREATE MAPPING trades (
id BIGINT,
ticker VARCHAR,
price DECIMAL,
amount BIGINT)
TYPE Kafka
OPTIONS (
'valueFormat' = 'json-flat',
'bootstrap.servers' = 'kafka:9092'
);
Here, you configure the connector to read JSON values with the following fields:
{
"id"
"ticker"
"price"
"amount"
}
Step 3. Run a Streaming Query on the Kafka Topic
-
Write a streaming query that filters trade messages from Kafka.
SELECT ticker, ROUND(price * 100) AS price_cents, amount FROM trades WHERE price * amount > 100;
You should see an empty table:
+------------+----------------------+-------------------+ |ticker | price_cents| amount| +------------+----------------------+-------------------+
Streaming queries like this one continue to run until you close the shell or kill the process with Ctrl+C. -
In another terminal, open another connection to the SQL shell and publish some messages to the
trades
topic.INSERT INTO trades VALUES (1, 'ABCD', 5.5, 10), (2, 'EFGH', 14, 20);
-
Go back to the terminal where you created the streaming query.
You should see that Hazelcast has executed the query and filtered the results:
+-----------------+----------------------+-------------------+ |ticker | price_cents| amount| +-----------------+----------------------+-------------------+ |EFGH | 1400| 20|
Step 4. Enrich the Data in the Kafka Messages
Kafka messages are often small and contain minimal data to reduce network latency. For example, the trades
topic does not contain any information about the company that’s associated with a given ticker. To get deeper insights from data in Kafka topics, you can join query results with data in other mappings.
-
Create a mapping to a new map in which to store the company information that you’ll use to enrich results from the
trades
topic.CREATE MAPPING companies ( __key BIGINT, ticker VARCHAR, company VARCHAR, marketcap BIGINT) TYPE IMap OPTIONS ( 'keyFormat'='bigint', 'valueFormat'='json-flat');
-
Add some entries to the
companies
map.INSERT INTO companies VALUES (1, 'ABCD', 'The ABCD', 100000), (2, 'EFGH', 'The EFGH', 5000000);
-
Use the
JOIN
clause to merge results from thecompanies
map andtrades
topic so you can see which companies are being traded.SELECT trades.ticker, companies.company, trades.amount FROM trades JOIN companies ON companies.ticker = trades.ticker;
+------------+-----------+----------+ |ticker |company | amount| +------------+-----------+----------+
-
In another SQL shell, publish some messages to the
trades
topic.INSERT INTO trades VALUES (1, 'ABCD', 5.5, 10), (2, 'EFGH', 14, 20);
-
Go back to the terminal where you created the streaming query that merges results from the
companies
map andtrades
topic.You should see that Hazelcast has executed the query.
+------------+-----------+----------+ |ticker |company | amount| +------------+-----------+----------+ |ABCD |The ABCD |10 | |EFGH |The EFGH |20 |
Step 5. Ingest Query Results into a Hazelcast Map
To save your query results as a view that you can later access faster, you can cache them in Hazelcast by ingesting them into a map.
-
Create a mapping to a new map in which to ingest your streaming query results.
CREATE MAPPING trade_map ( __key BIGINT, ticker VARCHAR, company VARCHAR, amount BIGINT) TYPE IMap OPTIONS ( 'keyFormat'='bigint', 'valueFormat'='json-flat');
-
Submit a streaming job to your cluster that will monitor your
trade
topic for changes and store them in a map.CREATE JOB ingest_trades AS SINK INTO trade_map SELECT trades.id, trades.ticker, companies.company, trades.amount FROM trades JOIN companies ON companies.ticker = trades.ticker;
A streaming job will run indefinitely until it is explicitly canceled or the cluster is shut down. Even if you kill the shell connection, the job will continue running on the cluster. -
List your job to make sure that it was successfully submitted.
SHOW JOBS;
You should see a job called
ingest_trades
.+--------------------+ |name | +--------------------+ |ingest_trades | +--------------------+
-
Publish some messages to the Kafka topic.
INSERT INTO trades VALUES (1, 'ABCD', 5.5, 10), (2, 'EFGH', 14, 20);
-
Query your
trade_map
map to see that the Kafka messages have been added to it.SELECT * FROM trade_map;
You should see that the data coming from the Kafka broker is being stored in your map.
+---------+---------+----------+------------+ | id|ticker | company| amount| +---------+---------+----------+------------+ | 2|EFGH |The EFGH | 20| | 1|ABCD |The ABCD | 10| +---------+---------+----------+------------+
Step 6. Cancel the Job
A streaming job will run indefinitely until it is explicitly canceled or the cluster is shut down. Even if you kill the shell connection, the job will continue running on the cluster.
-
To stop your streaming job, use the
DROP
statement to cancel it.DROP JOB ingest_trades;
In the terminal where you started the Hazelcast member, you should see that the job is canceled as well as the time it was started and how long it ran for.
Start time: 2021-05-13T16:31:14.410
Duration: 00:02:48.318
Next Steps
To learn more about SQL, see SQL
To learn how to connect to Confluent Cloud, see https://docs.hazelcast.com/tutorials/create-materialized-view-from-kafka
To learn how to work with jobs, see the following: