SQL Basics on Hazelcast Cloud (Stock Ticker)

This tutorial introduces you to using SQL on Hazelcast Cloud via the SQL browser. You will run streaming queries, perform data enrichment, windowed aggregations, and create a streaming job.

Context

In this tutorial, you will use stock ticker data to practice all the functionality of SQL on Hazelcast. Specifically, you will

  • Create streaming views

  • Create streaming queries

  • Use SQL to populate an in-memory table for enrichment

  • Enrich streaming data

  • Perform data aggregations on streaming data using windowing

  • Create stream-to-stream joins

  • Create a job to sink enriched, aggregated data into an in-memory table

Before you Begin

Before starting this tutorial, make sure that you meet the following prerequisites:

Step 1. Connect to Streaming Data

In this step, you will connect your Hazelcast Cloud cluster to a source of streaming data - in this case, a Kafka server.

For more information on the commands in this section, refer to https://docs.hazelcast.com/hazelcast/latest/sql/sql-statements#ddl-statements

  1. Open an SQL input interface

    1. If you are using the CLC, open the SQL shell.

      clc
    2. If you are using Management Center, open the SQL tab at the top of the screen.

    3. If you are using the Cloud dashboard, select your cluster and then select SQL from the side navigation bar.

  2. Enter the following code to connect to the Kafka streaming server.

    CREATE OR REPLACE DATA CONNECTION TrainingKafkaConnection
         TYPE Kafka
         NOT SHARED
         OPTIONS (
              'bootstrap.servers'='35.88.250.10:9092',
              'security.protocol'='SASL_PLAINTEXT',
              'client.dns.lookup'='use_all_dns_ips',
              'sasl.mechanism'='SCRAM-SHA-512',
              'sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="training_ro" password="h@zelcast!";', 'session.timeout.ms'='45000');
  3. Create a mapping for the stock ticker data stream.

    CREATE OR REPLACE MAPPING "trades_topic"
    EXTERNAL NAME "sql_basics.trades" (
        id bigint,
        ticker varchar,
        price double,
        trade_ts timestamp with time zone,
        amount int EXTERNAL NAME amt
    )
    DATA CONNECTION "TrainingKafkaConnection"
    OPTIONS (
        'keyFormat' = 'varchar',
        'valueFormat' = 'json-flat'
    );
  4. Verify that the data is streaming by issuing a query.

    SELECT * FROM trades_topic;

    If you are using the SQL browser, you will need to press "Execute Query" to send the code to Hazelcast.

  5. If you’re using the CLC SQL shell, press CTRL-C to return to the prompt.

  6. Stop the query by clicking the "Stop Query" button in the SQL browser or typing CTRL-C in the SQL shell.

You may see or hear the term "streaming query". With Hazelcast, you’ll use the same SQL format to query both streaming and static data. The difference is in the output; when you query a streaming source, the output will continually update until you terminate the query.

Step 2. Streaming Queries

In this step, we’ll run several queries against the streaming data. You will see that the query runs contiuously against the data stream until you stop it with CTRL-C.

Because the data is randomly generated, you may have to wait several seconds for output for some of these queries.

  1. Add column headers, round the price to 2 decimal places, omit ID and timestamp from the output.

    SELECT ticker AS Symbol, ROUND(price,2) AS Price, amt AS "Shares Sold"
    FROM trades_topic;
  2. Limit the output to one stock symbol.

    SELECT ticker AS Symbol, ROUND(price,2) AS Price, amt AS "Shares Sold"
    FROM trades_topic
    WHERE ticker = 'APPL';
  3. Limit the output to one symbol and sales of over 50 shares.

    SELECT ticker AS Symbol, ROUND(price,2), amt AS "Shares Sold"
    FROM trades_topic
    WHERE ticker = 'VOO' AND amt > 50;

Step 3. Enriching the Data

Streaming data usually contains timely information, but not necessarily complete information. We can merge the streaming data with data stored in memory to provide context. In this step, we’ll create an in-memory map, then populate it with enrichment data - in this case, the full name of the company and the company location.

  1. Create a mapping for an in-memory data table (IMap).

    CREATE or REPLACE MAPPING companies (
    __key BIGINT,
    ticker VARCHAR,
    company VARCHAR,
    hqcity VARCHAR,
    hqstate VARCHAR )
    TYPE IMap
    OPTIONS (
    'keyFormat'='bigint',
    'valueFormat'='json-flat');
  2. Add company info to the table.

    INSERT INTO companies VALUES
    (1, 'APPL', 'Apple','Cupertino','CA'),
    (2, 'GOOG', 'Alphabet (Google)', 'Mountain View', 'CA'),
    (3, 'META', 'Meta (Facebook)','Menlo Park', 'CA'),
    (4, 'NFLX', 'Netflix','Los Gatos', 'CA'),
    (5, 'AMZN', 'Amazon', 'Seattle', 'WA'),
    (6, 'INTC', 'Intel', 'Santa Clara', 'CA'),
    (7, 'CSCO', 'Cisco', 'San Jose', 'CA'),
    (8, 'BABA', 'Alibaba', 'Hangzhou', 'Zhejiang'),
    (9, 'VOO', 'Vanguard S&P 500','n/a','n/a');
  3. Verify that the data is in the IMap.

    SELECT * FROM companies;
  4. Use a JOIN to combine the static company information with the streaming data.

    SELECT
        trades.ticker AS Symbol,
        companies.company as Company,
         ROUND(trades.price,2) AS Price,
         trades.amt AS "Shares Sold"
    FROM trades_topic AS trades
    JOIN companies
    ON companies.ticker = trades.ticker;

Step 4. Watermarking and Windowing

Data aggregation is a common ETL function, but how do you do it on streaming data? The answer is to perform it over specific windows of time. The aggregations and computations are performed on all data included within each window. The output is then updated for each window.

For a detailed description of watermarking and window types, refer to the SQL Stream Processing topic in the Hazelcast documentation.

  1. In order to ensure that data is included in the correct window, you have to first create a new view that orders the data based on one of the data fields (the watermark). In this tutorial, we’ll use timestamp as the watermark.

    CREATE OR REPLACE VIEW trades_ordered AS
    SELECT *
      FROM TABLE(IMPOSE_ORDER(
      TABLE trades_topic,
      DESCRIPTOR(trade_ts),
      INTERVAL '0.5' SECONDS));
  2. For our first aggregation, we’ll display the minimum and maximum price for each stock over a 5 second window. (Output will not appear until 5 seconds have elapsed.)

    SELECT
         window_start,
         window_end,
         ticker,
         ROUND(MAX(price),2) AS high,
         ROUND(MIN(price),2) AS low
    FROM TABLE(TUMBLE(
         TABLE trades_ordered,
         DESCRIPTOR(trade_ts),
         INTERVAL '5' SECONDS
    ))
    GROUP BY 1,2,3
    ;

    This query will display the average price over a 5 second window, updating the result every second.

    SELECT
         window_start,
         window_end,
         ticker,
         ROUND(AVG(price),2) as average
    FROM TABLE(HOP(
      TABLE trades_ordered,
      DESCRIPTOR(trade_ts),
      INTERVAL '5' SECONDS, INTERVAL '1' SECOND
    ))
    GROUP BY 1,2,3;

Step 5: Stream to Stream Joins

You can join two or more related streams of data and store the results. In this example, we’re going to create two different joined queries:

  • Combine the high/low query above with the current trade data to display high, low, and current pricing.

  • Combine the average query above with the current trade data, along with a "flag" field that indicates whether the current price is higher or lower than the calculated average.

    1. Create a view for the high and low price output. This creates a new data stream.

      CREATE OR REPLACE VIEW high_low AS
           SELECT
                window_start,
                window_end,
                ticker,
                ROUND(MAX(price),2) AS high,
                ROUND(MIN(price),2) AS low
           FROM TABLE(TUMBLE(
                TABLE trades_ordered,
                DESCRIPTOR(trade_ts),
                INTERVAL '5' SECONDS
           ))
           GROUP BY 1,2,3;
    2. Join the trades_ordered stream and the high_low stream to display ticker symbol, high, low, and current price.

      SELECT
           tro.ticker AS Symbol,
           tro.price AS Price,
           hl.high AS High,
           hl.low AS Low
      FROM trades_ordered AS tro
      JOIN high_low AS hl
      ON tro.ticker = hl.ticker
      AND hl.window_end BETWEEN tro.trade_ts AND tro.trade_ts + INTERVAL '0.1' SECONDS;
    3. Create a view for the average price display above. This creates another new data stream.

      CREATE OR REPLACE VIEW priceavg AS
           SELECT
                window_start,
                window_end,
                ticker,
                ROUND(AVG(price),2) as average
           FROM TABLE(HOP(
                TABLE trades_ordered,
                DESCRIPTOR(trade_ts),
                INTERVAL '5' SECONDS, INTERVAL '1' SECOND
           ))
           GROUP BY 1,2,3;
    4. Join the trades_ordered stream and the priceavg stream, calculating the percent difference between the average price and the current price.

      SELECT
           tro.ticker AS Symbol,
           ROUND(tro.price,2) AS Price,
           pr.average AS Average,
           ROUND(((tro.price/pr.average)-1)*100,2) AS Percent_Change
      FROM trades_ordered AS tro
      JOIN priceavg AS pr
      ON tro.ticker = pr.ticker
      AND pr.window_end BETWEEN tro.trade_ts AND tro.trade_ts + INTERVAL '0.1' SECOND;
    5. Add a column that displays whether the stock value is up or down from the previous average.

      SELECT
           tro.ticker AS Symbol,
           tro.price AS Price,
           pr.average AS Average,
           ROUND(((tro.price/pr.average)-1)*100,2) AS Percent_Change,
           CASE
                WHEN (ROUND(((tro.price/pr.average)-1)*100,2) > 0) THEN 'Up'
                ELSE 'Down'
           END AS Up_Down
      FROM trades_ordered AS tro
      JOIN priceavg AS pr
      ON tro.ticker = pr.ticker
      AND pr.window_end BETWEEN tro.trade_ts AND tro.trade_ts + INTERVAL '0.1' SECOND;

Step 6: Create an SQL Job

So far everything we’ve done is displaying output to our console screen. To direct output to a different destination, you’ll need to create a job. Jobs run in the background, independent of any client connection, until you stop them.

Our job will populate a table in memory. To keep memory use at a minimum, the output table will use the stock symbol as the key field, so that only the latest trade information is stored.

  1. Create a view that enriches trades_ordered view from Step 4 with the data in the companies IMap from Step 3.

    CREATE OR REPLACE VIEW tro_enriched AS
         SELECT
              tro.ticker AS ticker,
              companies.company as company,
              ROUND(tro.price,2) AS price,
              tro.amt AS shares,
              tro.trade_ts
         FROM trades_ordered AS tro
         JOIN companies
         ON companies.ticker = tro.ticker;
  2. Create a view that combines tro_enriched and the 'high_low' stream from Step 5.

    CREATE OR REPLACE VIEW high_low_enriched AS
         SELECT
              troe.ticker AS ticker,
              troe.company AS company,
              troe.price AS price,
              troe.shares AS shares,
              hl.high AS high,
              hl.low AS low,
              troe.trade_ts
         FROM tro_enriched AS troe
         JOIN high_low AS hl
         ON troe.ticker = hl.ticker
         AND hl.window_end BETWEEN troe.trade_ts AND troe.trade_ts + INTERVAL '0.1' SECOND;
  3. Verify the output from the high_low_enriched view.

    SELECT * FROM high_low_enriched;
    SELECT * FROM high_low_enriched
         WHERE ticker = 'APPL';
  4. Create an IMap to serve as a sink for the data generated by high_low_enriched.

    CREATE OR REPLACE MAPPING current_trade (
       __key VARCHAR,
       company VARCHAR,
       price DECIMAL,
       shares DECIMAL,
       high DECIMAL,
       low DECIMAL,
       trade_ts TIMESTAMP
     ) TYPE IMap
    OPTIONS (
       'keyFormat' = 'varchar',
       'valueFormat' = 'json-flat'
    );
  5. Create a job that sinks the high_low_enriched data into the current_trade IMap. Use the ticker symbol as the key for the current_trade IMap.

    Using the ticker as the key in the sink limits the map to storing only the latest trade data for each stock symbol.

    CREATE JOB current_trades
    AS SINK INTO current_trade
    SELECT
         troe.ticker AS __key,
         troe.company AS company,
         troe.price AS price,
         troe.shares AS shares,
         hl.high AS high,
         hl.low AS low,
         troe.trade_ts
    FROM tro_enriched AS troe
    JOIN high_low AS hl
    ON troe.ticker = hl.ticker
    AND hl.window_end BETWEEN troe.trade_ts AND troe.trade_ts + INTERVAL '0.1' SECOND;
  6. Verify that entries are being added to the current_trade map. Run this query multiple times to verify that the data is changing.

    SELECT * FROM current_trade;

Summary

In this tutorial, you learned the following:

  • Create streaming views

  • Create streaming queries

  • Use SQL to populate an in-memory table for enrichment

  • Enrich streaming data

  • Perform data aggregations on streaming data using windowing

  • Create stream-to-stream joins

  • Create a job to sink enriched, aggregated data into an in-memory table

See Also

Stream Processing in SQL (Documentation)

SQL Statements (Documentation)