Generating Streaming Data Using SQL
Use SQL on Hazelcast to generate randomized streaming data for demo/POC purposes.
Context
In this tutorial, you will learn how to use SQL to generate streaming data locally to Hazelcast.
Using the VIEW
and generate-stream
functions of SQL on Hazelcast, you can create a data stream locally within Hazelcast.
As of Hazelcast 5.3, you can:
-
Access this data directly via SQL functionality in any programming language.
-
Direct the output to an external Kafka server, then use the connection to the Kafka server to access the data via the pipeline API or via SQL using
CREATE MAPPING
.
Hazelcast 5.4 exposes SQL through the Kafka Connect connector, eliminating the need for an external Kafka server to feed data into the pipeline API. For further information on using the Kafka Connect connector, refer to the Platform documentation. |
Before you Begin
Before starting this tutorial, make sure that you meet the following prerequisites:
-
Running cluster of Hazelcast, either Cloud or Platform
-
Connection to SQL command line, either through CLC or through Management Center
-
For Step 2, a Kafka instance accessible by your Hazelcast cluster
Step 1. Generating Data
-
Look through the following SQL code from the SQL Basics on Cloud (Stock Ticker Demo) tutorial. The comments explain what the code is doing.
CREATE OR REPLACE VIEW trades AS SELECT id, CASE WHEN tickRand BETWEEN 0 AND 0.1 THEN 'APPL' WHEN tickRand BETWEEN 0.1 AND 0.2 THEN 'GOOGL' WHEN tickRand BETWEEN 0.2 AND 0.3 THEN 'META' WHEN tickRand BETWEEN 0.3 AND 0.4 THEN 'NFLX' WHEN tickRand BETWEEN 0.4 AND 0.5 THEN 'AMZN' WHEN tickRand BETWEEN 0.5 AND 0.6 THEN 'INTC' WHEN tickRand BETWEEN 0.6 AND 0.7 THEN 'CSCO' WHEN tickRand BETWEEN 0.7 AND 0.8 THEN 'BABA' ELSE 'VOO' END as ticker, (1) CASE WHEN tickRand BETWEEN 0 and 0.1 then tickRand*50+1 WHEN tickRand BETWEEN 0.1 AND 0.2 THEN tickRand*75+.6 WHEN tickRand BETWEEN 0.2 AND 0.3 THEN tickRand*60+.2 WHEN tickRand BETWEEN 0.3 AND 0.4 THEN tickRand*30+.3 WHEN tickRand BETWEEN 0.4 AND 0.5 THEN tickRand*43+.7 WHEN tickRand BETWEEN 0.5 AND 0.6 THEN tickRand*100+.4 WHEN tickRand BETWEEN 0.6 AND 0.7 THEN tickRand*25+.8 WHEN tickRand BETWEEN 0.6 AND 0.7 THEN tickRand*80+.5 WHEN tickRand BETWEEN 0.7 AND 0.8 THEN tickRand*10+.1 ELSE tickRand*100+4 END as price,(2) trade_ts, amount FROM (SELECT v as id, RAND(v*v) as tickRand,(3) TO_TIMESTAMP_TZ(v*10 + 1645484400000) as trade_ts, (4) ROUND(RAND()*100, 0) as amount FROM TABLE(generate_stream(100))); (5)
1 We’re using the random number to generate different stock ticker symbols. 2 To keep each ticker’s price within a reasonable range of variation, we use the same BETWEEN
ranges, and give each one a different base multiplier.3 The random number generator creates the tickRand
value.4 We seed the timestamp with a base value that equates to 21 Feb 2022. You can change this to any reasonable Unix timestamp. 5 The generate_stream
function is what makes this all work. In this example, we’re generating 100 events per second. -
Paste the above code into your SQL interface.
-
Verify that the data is being generated using SQL queries. Because you’re looking at streaming data, you’ll need to use CTRL-C to stop each query.
SELECT * from trades;
SELECT ticker AS Symbol, ROUND(price,2) AS Price, amount AS "Shares Sold"
FROM trades;
Step 2. Inserting into Kafka
You can send generated data to Kafka. Kafka will store and replay it as it would data from any other streaming source. Instead of creating a view local to Hazelcast, you’ll create a mapping within SQL for the Kafka topic, then use the INSERT
function to send generated data to that topic.
-
First, create a mapping for the data. Include all the fields that you’ll generate with SQL. This statement creates the
trades
topic in Kafka.CREATE or REPLACE MAPPING trades ( id BIGINT, ticker VARCHAR, price DECIMAL, trade_ts TIMESTAMP WITH TIME ZONE, amount BIGINT) TYPE Kafka OPTIONS ( 'valueFormat' = 'json-flat', 'bootstrap.servers' = 'broker:9092' );
-
Next, use the
INSERT
function to send the data to thetrades
topic you just created. The code to generate the data is exactly the same; the only difference is that we’re sending it to Kafka instead of creating a local view.INSERT INTO trades SELECT id, CASE WHEN tickRand BETWEEN 0 AND 0.1 THEN 'APPL' WHEN tickRand BETWEEN 0.1 AND 0.2 THEN 'GOOGL' WHEN tickRand BETWEEN 0.2 AND 0.3 THEN 'META' WHEN tickRand BETWEEN 0.3 AND 0.4 THEN 'NFLX' WHEN tickRand BETWEEN 0.4 AND 0.5 THEN 'AMZN' WHEN tickRand BETWEEN 0.5 AND 0.6 THEN 'INTC' WHEN tickRand BETWEEN 0.6 AND 0.7 THEN 'CSCO' WHEN tickRand BETWEEN 0.7 AND 0.8 THEN 'BABA' ELSE 'VOO' END as ticker, CASE WHEN tickRand BETWEEN 0 and 0.1 then tickRand*50+1 WHEN tickRand BETWEEN 0.1 AND 0.2 THEN tickRand*75+.6 WHEN tickRand BETWEEN 0.2 AND 0.3 THEN tickRand*60+.2 WHEN tickRand BETWEEN 0.3 AND 0.4 THEN tickRand*30+.3 WHEN tickRand BETWEEN 0.4 AND 0.5 THEN tickRand*43+.7 WHEN tickRand BETWEEN 0.5 AND 0.6 THEN tickRand*100+.4 WHEN tickRand BETWEEN 0.6 AND 0.7 THEN tickRand*25+.8 WHEN tickRand BETWEEN 0.6 AND 0.7 THEN tickRand*80+.5 WHEN tickRand BETWEEN 0.7 AND 0.8 THEN tickRand*10+.1 ELSE tickRand*100+4 END as price, trade_ts, amount FROM (SELECT v as id, RAND(v*v) as tickRand,(3) TO_TIMESTAMP_TZ(v*10 + 1645484400000) as trade_ts, (4) ROUND(RAND()*100, 0) as amount FROM TABLE(generate_stream(100))); (5)
-
You can now query this data as above using SQL.
You can also access this streaming data with the Pipeline API using the following call. (For details on setting up the Kakfa sources properties, see the Apache Kafka Connector) section of the documentation.
Pipeline p = Pipeline.create(); p.readFrom(KafkaSources.kafka(properties, "trades"))