Generating Streaming Data Using SQL

Use SQL on Hazelcast to generate randomized streaming data for demo/POC purposes.

Context

In this tutorial, you will learn how to use SQL to generate streaming data locally to Hazelcast.

Using the VIEW and generate-stream functions of SQL on Hazelcast, you can create a data stream locally within Hazelcast.

As of Hazelcast 5.3, you can:

  • Access this data directly via SQL functionality in any programming language.

  • Direct the output to an external Kafka server, then use the connection to the Kafka server to access the data via the pipeline API or via SQL using CREATE MAPPING.

Hazelcast 5.4 exposes SQL through the Kafka Connect connector, eliminating the need for an external Kafka server to feed data into the pipeline API. For further information on using the Kafka Connect connector, refer to the Platform documentation.

Before you Begin

Before starting this tutorial, make sure that you meet the following prerequisites:

  • Running cluster of Hazelcast, either Cloud or Platform

  • Connection to SQL command line, either through CLC or through Management Center

  • For Step 2, a Kafka instance accessible by your Hazelcast cluster

Step 1. Generating Data

  1. Look through the following SQL code from the SQL Basics on Cloud (Stock Ticker Demo) tutorial. The comments explain what the code is doing.

    CREATE OR REPLACE VIEW trades AS
      SELECT id,
    
           CASE WHEN tickRand BETWEEN 0 AND 0.1 THEN 'APPL'
                WHEN tickRand BETWEEN 0.1 AND 0.2 THEN 'GOOGL'
                WHEN tickRand BETWEEN 0.2 AND 0.3 THEN 'META'
                WHEN tickRand BETWEEN 0.3 AND 0.4 THEN 'NFLX'
                WHEN tickRand BETWEEN 0.4 AND 0.5 THEN 'AMZN'
                WHEN tickRand BETWEEN 0.5 AND 0.6 THEN 'INTC'
                WHEN tickRand BETWEEN 0.6 AND 0.7 THEN 'CSCO'
                WHEN tickRand BETWEEN 0.7 AND 0.8 THEN 'BABA'
                ELSE 'VOO'
           END as ticker, (1)
    
           CASE WHEN tickRand BETWEEN 0 and 0.1 then tickRand*50+1
                WHEN tickRand BETWEEN 0.1 AND 0.2 THEN tickRand*75+.6
                WHEN tickRand BETWEEN 0.2 AND 0.3 THEN tickRand*60+.2
                WHEN tickRand BETWEEN 0.3 AND 0.4 THEN tickRand*30+.3
                WHEN tickRand BETWEEN 0.4 AND 0.5 THEN tickRand*43+.7
                WHEN tickRand BETWEEN 0.5 AND 0.6 THEN tickRand*100+.4
                WHEN tickRand BETWEEN 0.6 AND 0.7 THEN tickRand*25+.8
                WHEN tickRand BETWEEN 0.6 AND 0.7 THEN tickRand*80+.5
                WHEN tickRand BETWEEN 0.7 AND 0.8 THEN tickRand*10+.1
                ELSE tickRand*100+4
           END as price,(2)
    
           trade_ts,
           amount
    FROM
        (SELECT v as id,
               RAND(v*v) as tickRand,(3)
               TO_TIMESTAMP_TZ(v*10 + 1645484400000) as trade_ts, (4)
               ROUND(RAND()*100, 0) as amount
         FROM TABLE(generate_stream(100))); (5)
    1 We’re using the random number to generate different stock ticker symbols.
    2 To keep each ticker’s price within a reasonable range of variation, we use the same BETWEEN ranges, and give each one a different base multiplier.
    3 The random number generator creates the tickRand value.
    4 We seed the timestamp with a base value that equates to 21 Feb 2022. You can change this to any reasonable Unix timestamp.
    5 The generate_stream function is what makes this all work. In this example, we’re generating 100 events per second.
  2. Paste the above code into your SQL interface.

  3. Verify that the data is being generated using SQL queries. Because you’re looking at streaming data, you’ll need to use CTRL-C to stop each query.

SELECT * from trades;

SELECT ticker AS Symbol, ROUND(price,2) AS Price, amount AS "Shares Sold"
FROM trades;

Step 2. Inserting into Kafka

You can send generated data to Kafka. Kafka will store and replay it as it would data from any other streaming source. Instead of creating a view local to Hazelcast, you’ll create a mapping within SQL for the Kafka topic, then use the INSERT function to send generated data to that topic.

  1. First, create a mapping for the data. Include all the fields that you’ll generate with SQL. This statement creates the trades topic in Kafka.

    CREATE or REPLACE MAPPING trades (
        id BIGINT,
        ticker VARCHAR,
        price DECIMAL,
        trade_ts TIMESTAMP WITH TIME ZONE,
        amount BIGINT)
    TYPE Kafka
    OPTIONS (
        'valueFormat' = 'json-flat',
        'bootstrap.servers' = 'broker:9092'
    );
  2. Next, use the INSERT function to send the data to the trades topic you just created. The code to generate the data is exactly the same; the only difference is that we’re sending it to Kafka instead of creating a local view.

    INSERT INTO trades
      SELECT id,
           CASE WHEN tickRand BETWEEN 0 AND 0.1 THEN 'APPL'
                WHEN tickRand BETWEEN 0.1 AND 0.2 THEN 'GOOGL'
                WHEN tickRand BETWEEN 0.2 AND 0.3 THEN 'META'
                WHEN tickRand BETWEEN 0.3 AND 0.4 THEN 'NFLX'
                WHEN tickRand BETWEEN 0.4 AND 0.5 THEN 'AMZN'
                WHEN tickRand BETWEEN 0.5 AND 0.6 THEN 'INTC'
                WHEN tickRand BETWEEN 0.6 AND 0.7 THEN 'CSCO'
                WHEN tickRand BETWEEN 0.7 AND 0.8 THEN 'BABA'
                ELSE 'VOO'
           END as ticker,
           CASE WHEN tickRand BETWEEN 0 and 0.1 then tickRand*50+1
                WHEN tickRand BETWEEN 0.1 AND 0.2 THEN tickRand*75+.6
                WHEN tickRand BETWEEN 0.2 AND 0.3 THEN tickRand*60+.2
                WHEN tickRand BETWEEN 0.3 AND 0.4 THEN tickRand*30+.3
                WHEN tickRand BETWEEN 0.4 AND 0.5 THEN tickRand*43+.7
                WHEN tickRand BETWEEN 0.5 AND 0.6 THEN tickRand*100+.4
                WHEN tickRand BETWEEN 0.6 AND 0.7 THEN tickRand*25+.8
                WHEN tickRand BETWEEN 0.6 AND 0.7 THEN tickRand*80+.5
                WHEN tickRand BETWEEN 0.7 AND 0.8 THEN tickRand*10+.1
                ELSE tickRand*100+4
           END as price,
           trade_ts,
           amount
    FROM
        (SELECT v as id,
               RAND(v*v) as tickRand,(3)
               TO_TIMESTAMP_TZ(v*10 + 1645484400000) as trade_ts, (4)
               ROUND(RAND()*100, 0) as amount
         FROM TABLE(generate_stream(100))); (5)
  3. You can now query this data as above using SQL.

    You can also access this streaming data with the Pipeline API using the following call. (For details on setting up the Kakfa sources properties, see the Apache Kafka Connector) section of the documentation.

    Pipeline p = Pipeline.create();
    p.readFrom(KafkaSources.kafka(properties, "trades"))

Summary

You can now use SQL on Hazelcast to generate streaming data for testing/demo purposes.