A newer version of Platform is available.

View latest

Stream Processing in SQL

Stream processing is a programming paradigm for continuously performing computations over events as they arrive. The streams are never-ending, which is why they are often described as unbounded.

You can run SQL queries on streaming data, using standard SQL as well as some custom functions for features such as streaming aggregations.

Context

This guide explains how to query streams, using the following trades data as an example:

Field Data type

trade_amount

BIGINT

trade_ticker

VARCHAR

price

DECIMAL

trade_time

TIMESTAMP

Creating a Mapping to a Streaming Source

To allow Hazelcast to read data from a streaming source, you need to create a mapping to it. For example, the mapping for the trades Kafka topic may look like the following:

CREATE OR REPLACE MAPPING trades (
  trade_amount BIGINT,
  trade_ticker VARCHAR,
  price DECIMAL,
  trade_time TIMESTAMP)
TYPE Kafka
OPTIONS (
  'valueFormat' = 'json-flat',
  'bootstrap.servers' = 'kafka:9092'
);
At the moment, the only supported streaming source for SQL is a Kafka mapping.

Basic Continuous Queries on a Stream

You can query streams like you would query any batch source. The main difference between streaming and batch queries is that when you query a streaming source, Hazelcast creates a job, which continues to run until you cancel it or the client disconnects.

For example, you can monitor the whole trades stream:

SELECT * FROM trades;

This query starts a job that monitors the trades stream for new events and executes the query on new events.

The output contains all trades since the query was first executed. If no trades have yet been made, the output will be empty, however the query will not complete and the client will wait for more rows:

+------------+------------+-----+----------+
|trade_amount|trade_ticker|price|trade_time|
+------------+------------+-----+----------+

You can also use other clauses such as the WHERE clause to filter the output of the stream:

SELECT * FROM trades
WHERE price > 10;

When new events are received, Hazelcast immediately returns the new rows from your active query. For example, the following query writes some trading data to the trades topic:

INSERT INTO trades VALUES
  (100, 'ABCD', 11, '2022-01-01 00:00:00'),
  (20, 'EFGH', 1.5, '2022-01-01 00:01:00');

The result of the filter:

+------------+------------+-----+-------------------+
|trade_amount|trade_ticker|price|trade_time---------|
+------------+------------+-----+-------------------+
|         100|      'ABCD'|   11|2022-01-01 00:00:00|

Managing Streaming Queries

Hazelcast converts streaming queries into jobs, which are executed by the Jet engine. Streaming queries continue to run until the client disconnects or you explicitly close the query.

If the client stops receiving results, but doesn’t close the result, Hazelcast will fill up internal buffers and then the job will be blocked using backpressure. See Backpressure.

You can manage jobs backing SQL queries. For example, you may want to show all running jobs or cancel a query. See Managing Jobs.

Stream Aggregation

An aggregate function performs computations over a group of rows and returns a single value. For example, the COUNT(*) function returns the number of rows in the group. However, streams are unbounded, making it impossible to count all the rows because they are continuously being appended. To use aggregate functions over a stream, you need a way to group the rows into groups for which the engine can tell, when the group is complete.

A common way to group values is in relation to time, because a stream of events happening in real time is ordered by time. For example, you might want to perform some aggregation over all trades that happened in the past minute. When a next minute starts, Hazelcast can calculate the aggregation for the previous minute.

To assign events into time-based windows, you need to use a windowing function. Hazelcast supports the following windowing functions:

  • TUMBLE(): A tumbling window assigns events into non-overlapping windows of a fixed length.

    A tumbling window

  • HOP: A hopping window (sometimes also called sliding window), assigns events into overlapping windows of a fixed length. The 4th parameter of this function defines the hop size, i.e. the distance between the starts of two consecutive windows.

    A hopping window

For a conceptual introduction to windowing and watermarks, see Stream Processing and Event Timestamps.
SELECT window_start, window_end, COUNT(*) AS total_trades (1)
FROM TABLE(TUMBLE(
  TABLE trades_ordered, (2)
  DESCRIPTOR(trade_time), (3)
  INTERVAL '1' MINUTE)) (4)
GROUP BY 1,2; (5)
1 Get a count of all trades that happened in the window.
2 Use an ordered view of tables as input (explained later).
3 Use the timestamp in the trade_time column to determine the window the event belongs to.
4 Set the size of the tumbling window to one minute.
5 Defines the grouping, the numbers 1 and 2 refer to 1st and 2nd column of the SELECT clause, therefore we group by the window_start and window_end columns.

When new results are complete for each one-minute window, they are returned:

+-------------------+-------------------+--------------------+
|window_start       |window_end         |        total_trades|
+-------------------+-------------------+--------------------+
|2022-01-04T00:00   |2022-01-04T00:01   |                  45|
...

Creating Watermarks

Hazelcast can’t emit the result of a windowed aggregation until it has received all the events belonging to the window. But streams typically aren’t strictly ordered by time, events arrive each with different latency. To tell Hazelcast how long to wait, you must define a watermark.

One way of assigning watermarks is defined by how much time an event is allowed to be delayed after the newest event received so far. This time is called the maximum event lag. Any event that is later than the maximum event lag is dropped. This is the only strategy currently supported.

Time is measured by the timestamps in the events, rather than the current time on a system clock.

Hazelcast calls the function to add watermarks IMPOSE_ORDER(), because it curbs the potentially unbounded disorder of the events in the stream to a fixed value. The IMPOSE_ORDER() function is a stateful function whose state is scoped for the duration of the query. This function injects watermarks that lag a fixed amount behind the maximum value of the field observed since the query started.

SELECT *
FROM TABLE(IMPOSE_ORDER(
  TABLE trades, (1)
  DESCRIPTOR(trade_time), (2)
  INTERVAL '0.5' SECONDS) (3)
);
1 The table that contains the events, including the timestamp.
2 A pointer to the column that contains the timestamp for the watermark.
3 The maximum event lag. Any events that are later than this lag are dropped. For example, an event with a timestamp of yyyy-mm-dd 23:59:59.5 is added to the window. If another event is processed with a timestamp that’s 0.5 seconds or more old, such as `yyyy-mm-dd 23:59:58.9, that event is dropped because it is too old.

If an event is delayed by more than the defined maximum event lag, it is dropped.

The above query doesn’t currently run in Hazelcast. The IMPOSE_ORDER() function must be only used together with TUMBLE or HOP functions.

For better readability, it’s useful to create a view for the watermark like so:

CREATE VIEW trades_ordered AS
SELECT *
  FROM TABLE(IMPOSE_ORDER(
  TABLE trades,
  DESCRIPTOR(trade_time),
  INTERVAL '0.5' SECONDS)
);

We already used this view above. Without the view, you would have to have a nested call to IMPOSE_ORDER as the first argument to TUMBLE/HOP function, which is harder to read.