SQL Basics on Hazelcast Cloud (Stock Ticker)
This tutorial introduces you to using SQL on Hazelcast Cloud via the SQL browser. You will run streaming queries, perform data enrichment, windowed aggregations, and create a streaming job.
Context
In this tutorial, you will use stock ticker data to practice all the functionality of SQL on Hazelcast. Specifically, you will
-
Create streaming views
-
Create streaming queries
-
Use SQL to populate an in-memory table for enrichment
-
Enrich streaming data
-
Perform data aggregations on streaming data using windowing
-
Create stream-to-stream joins
-
Create a job to sink enriched, aggregated data into an in-memory table
Before you Begin
Before starting this tutorial, make sure that you meet the following prerequisites:
-
Connect the Command Line Client to your cluster or open the SQL tab for your Hazelcast Cloud instance.
Step 1. Connect to Streaming Data
In this step, you will connect your Hazelcast Cloud cluster to a source of streaming data - in this case, a Kafka server.
For more information on the commands in this section, refer to https://docs.hazelcast.com/hazelcast/latest/sql/sql-statements#ddl-statements |
-
Open an SQL input interface
-
If you are using the CLC, open the SQL shell.
clc
-
If you are using Management Center, open the SQL tab at the top of the screen.
-
If you are using the Cloud dashboard, select your cluster and then select SQL from the side navigation bar.
-
-
Enter the following code to connect to the Kafka streaming server.
CREATE OR REPLACE DATA CONNECTION TrainingKafkaConnection TYPE Kafka NOT SHARED OPTIONS ( 'bootstrap.servers'='35.88.250.10:9092', 'security.protocol'='SASL_PLAINTEXT', 'client.dns.lookup'='use_all_dns_ips', 'sasl.mechanism'='SCRAM-SHA-512', 'sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="training_ro" password="h@zelcast!";', 'session.timeout.ms'='45000');
-
Create a mapping for the stock ticker data stream.
CREATE OR REPLACE MAPPING "trades_topic" EXTERNAL NAME "sql_basics.trades" ( id bigint, ticker varchar, price double, trade_ts timestamp with time zone, amount int EXTERNAL NAME amt ) DATA CONNECTION "TrainingKafkaConnection" OPTIONS ( 'keyFormat' = 'varchar', 'valueFormat' = 'json-flat' );
-
Verify that the data is streaming by issuing a query.
SELECT * FROM trades_topic;
If you are using the SQL browser, you will need to press "Execute Query" to send the code to Hazelcast.
-
If you’re using the CLC SQL shell, press CTRL-C to return to the prompt.
-
Stop the query by clicking the "Stop Query" button in the SQL browser or typing CTRL-C in the SQL shell.
You may see or hear the term "streaming query". With Hazelcast, you’ll use the same SQL format to query both streaming and static data. The difference is in the output; when you query a streaming source, the output will continually update until you terminate the query. |
Step 2. Streaming Queries
In this step, we’ll run several queries against the streaming data. You will see that the query runs contiuously against the data stream until you stop it with CTRL-C.
Because the data is randomly generated, you may have to wait several seconds for output for some of these queries. |
-
Add column headers, round the price to 2 decimal places, omit ID and timestamp from the output.
SELECT ticker AS Symbol, ROUND(price,2) AS Price, amount AS "Shares Sold" FROM trades_topic;
-
Limit the output to one stock symbol.
SELECT ticker AS Symbol, ROUND(price,2) AS Price, amount AS "Shares Sold" FROM trades_topic WHERE ticker = 'APPL';
-
Limit the output to one symbol and sales of over 50 shares.
SELECT ticker AS Symbol, ROUND(price,2), amount AS "Shares Sold" FROM trades_topic WHERE ticker = 'VOO' AND amount > 50;
Step 3. Enriching the Data
Streaming data usually contains timely information, but not necessarily complete information. We can merge the streaming data with data stored in memory to provide context. In this step, we’ll create an in-memory map, then populate it with enrichment data - in this case, the full name of the company and the company location.
-
Create a mapping for an in-memory data table (IMap).
CREATE or REPLACE MAPPING companies ( __key BIGINT, ticker VARCHAR, company VARCHAR, hqcity VARCHAR, hqstate VARCHAR ) TYPE IMap OPTIONS ( 'keyFormat'='bigint', 'valueFormat'='json-flat');
-
Add company info to the table.
INSERT INTO companies VALUES (1, 'APPL', 'Apple','Cupertino','CA'), (2, 'GOOG', 'Alphabet (Google)', 'Mountain View', 'CA'), (3, 'META', 'Meta (Facebook)','Menlo Park', 'CA'), (4, 'NFLX', 'Netflix','Los Gatos', 'CA'), (5, 'AMZN', 'Amazon', 'Seattle', 'WA'), (6, 'INTC', 'Intel', 'Santa Clara', 'CA'), (7, 'CSCO', 'Cisco', 'San Jose', 'CA'), (8, 'BABA', 'Alibaba', 'Hangzhou', 'Zhejiang'), (9, 'VOO', 'Vanguard S&P 500','n/a','n/a');
-
Verify that the data is in the IMap.
SELECT * FROM companies;
-
Use a JOIN to combine the static company information with the streaming data.
SELECT trades.ticker AS Symbol, companies.company as Company, ROUND(trades.price,2) AS Price, trades.amount AS "Shares Sold" FROM trades_topic AS trades JOIN companies ON companies.ticker = trades.ticker;
Step 4. Watermarking and Windowing
Data aggregation is a common ETL function, but how do you do it on streaming data? The answer is to perform it over specific windows of time. The aggregations and computations are performed on all data included within each window. The output is then updated for each window.
For a detailed description of watermarking and window types, refer to the SQL Stream Processing topic in the Hazelcast documentation. |
-
In order to ensure that data is included in the correct window, you have to first create a new view that orders the data based on one of the data fields (the watermark). In this tutorial, we’ll use timestamp as the watermark.
CREATE OR REPLACE VIEW trades_ordered AS SELECT * FROM TABLE(IMPOSE_ORDER( TABLE trades_topic, DESCRIPTOR(trade_ts), INTERVAL '0.5' SECONDS));
-
For our first aggregation, we’ll display the minimum and maximum price for each stock over a 5 second window. (Output will not appear until 5 seconds have elapsed.)
SELECT window_start, window_end, ticker, ROUND(MAX(price),2) AS high, ROUND(MIN(price),2) AS low FROM TABLE(TUMBLE( TABLE trades_ordered, DESCRIPTOR(trade_ts), INTERVAL '5' SECONDS )) GROUP BY 1,2,3 ;
This query will display the average price over a 5 second window, updating the result every second.
SELECT window_start, window_end, ticker, ROUND(AVG(price),2) as average FROM TABLE(HOP( TABLE trades_ordered, DESCRIPTOR(trade_ts), INTERVAL '5' SECONDS, INTERVAL '1' SECOND )) GROUP BY 1,2,3;
Step 5: Stream to Stream Joins
You can join two or more related streams of data and store the results. In this example, we’re going to create two different joined queries:
-
Combine the high/low query above with the current trade data to display high, low, and current pricing.
-
Combine the average query above with the current trade data, along with a "flag" field that indicates whether the current price is higher or lower than the calculated average.
-
Create a view for the high and low price output. This creates a new data stream.
CREATE OR REPLACE VIEW high_low AS SELECT window_start, window_end, ticker, ROUND(MAX(price),2) AS high, ROUND(MIN(price),2) AS low FROM TABLE(TUMBLE( TABLE trades_ordered, DESCRIPTOR(trade_ts), INTERVAL '5' SECONDS )) GROUP BY 1,2,3;
-
Join the
stream and thetrades_ordered
stream to display ticker symbol, high, low, and current price.high_low
SELECT tro.ticker AS Symbol, tro.price AS Price, hl.high AS High, hl.low AS Low FROM trades_ordered AS tro JOIN high_low AS hl ON tro.ticker = hl.ticker AND hl.window_end BETWEEN tro.trade_ts AND tro.trade_ts + INTERVAL '0.1' SECONDS;
-
Create a view for the average price display above. This creates another new data stream.
CREATE OR REPLACE VIEW priceavg AS SELECT window_start, window_end, ticker, ROUND(AVG(price),2) as average FROM TABLE(HOP( TABLE trades_ordered, DESCRIPTOR(trade_ts), INTERVAL '5' SECONDS, INTERVAL '1' SECOND )) GROUP BY 1,2,3;
-
Join the
stream and thetrades_ordered
stream, calculating the percent difference between the average price and the current price.priceavg
SELECT tro.ticker AS Symbol, ROUND(tro.price,2) AS Price, pr.average AS Average, ROUND(((tro.price/pr.average)-1)*100,2) AS Percent_Change FROM trades_ordered AS tro JOIN priceavg AS pr ON tro.ticker = pr.ticker AND pr.window_end BETWEEN tro.trade_ts AND tro.trade_ts + INTERVAL '0.1' SECOND;
-
Add a column that displays whether the stock value is up or down from the previous average.
SELECT tro.ticker AS Symbol, tro.price AS Price, pr.average AS Average, ROUND(((tro.price/pr.average)-1)*100,2) AS Percent_Change, CASE WHEN (ROUND(((tro.price/pr.average)-1)*100,2) > 0) THEN 'Up' ELSE 'Down' END AS Up_Down FROM trades_ordered AS tro JOIN priceavg AS pr ON tro.ticker = pr.ticker AND pr.window_end BETWEEN tro.trade_ts AND tro.trade_ts + INTERVAL '0.1' SECOND;
-
Step 6: Create an SQL Job
So far everything we’ve done is displaying output to our console screen. To direct output to a different destination, you’ll need to create a job. Jobs run in the background, independent of any client connection, until you stop them.
Our job will populate a table in memory. To keep memory use at a minimum, the output table will use the stock symbol as the key field, so that only the latest trade information is stored.
-
Create a view that enriches
trades_ordered
view from Step 4 with the data in thecompanies
IMap from Step 3.CREATE OR REPLACE VIEW tro_enriched AS SELECT tro.ticker AS ticker, companies.company as company, ROUND(tro.price,2) AS price, tro.amount AS shares, tro.trade_ts FROM trades_ordered AS tro JOIN companies ON companies.ticker = tro.ticker;
-
Create a view that combines
tro_enriched
and the 'high_low' stream from Step 5.CREATE OR REPLACE VIEW high_low_enriched AS SELECT troe.ticker AS ticker, troe.company AS company, troe.price AS price, troe.shares AS shares, hl.high AS high, hl.low AS low, troe.trade_ts FROM tro_enriched AS troe JOIN high_low AS hl ON troe.ticker = hl.ticker AND hl.window_end BETWEEN troe.trade_ts AND troe.trade_ts + INTERVAL '0.1' SECOND;
-
Verify the output from the
high_low_enriched
view.SELECT * FROM high_low_enriched;
SELECT * FROM high_low_enriched WHERE ticker = 'APPL';
-
Create an IMap to serve as a sink for the data generated by
high_low_enriched
.CREATE OR REPLACE MAPPING current_trade ( __key VARCHAR, company VARCHAR, price DECIMAL, shares DECIMAL, high DECIMAL, low DECIMAL, trade_ts TIMESTAMP ) TYPE IMap OPTIONS ( 'keyFormat' = 'varchar', 'valueFormat' = 'json-flat' );
-
Create a job that sinks the
high_low_enriched
data into thecurrent_trade
IMap. Use the ticker symbol as the key for thecurrent_trade
IMap.Using the ticker as the key in the sink limits the map to storing only the latest trade data for each stock symbol.
CREATE JOB current_trades AS SINK INTO current_trade SELECT troe.ticker AS __key, troe.company AS company, troe.price AS price, troe.shares AS shares, hl.high AS high, hl.low AS low, troe.trade_ts FROM tro_enriched AS troe JOIN high_low AS hl ON troe.ticker = hl.ticker AND hl.window_end BETWEEN troe.trade_ts AND troe.trade_ts + INTERVAL '0.1' SECOND;
-
Verify that entries are being added to the
current_trade
map. Run this query multiple times to verify that the data is changing.SELECT * FROM current_trade;
Summary
In this tutorial, you learned the following:
-
Create streaming views
-
Create streaming queries
-
Use SQL to populate an in-memory table for enrichment
-
Enrich streaming data
-
Perform data aggregations on streaming data using windowing
-
Create stream-to-stream joins
-
Create a job to sink enriched, aggregated data into an in-memory table
See Also
Stream Processing in SQL (Documentation)
SQL Statements (Documentation)
Stream-to-Stream Joins (Tutorial)