Stream Processing in SQL
Stream processing is a programming paradigm for continuously performing computations over events as they arrive. The streams are never-ending, which is why they are often described as unbounded.
You can run SQL queries on streaming data, using standard SQL as well as some custom functions for features such as streaming aggregations.
This guide explains how to query streams, using the following
trades data as an example:
To allow Hazelcast to read data from a streaming source, you need to create a mapping to it. For example, the mapping for the
trades Kafka topic may look like the following:
CREATE OR REPLACE MAPPING trades ( trade_amount BIGINT, trade_ticker VARCHAR, price DECIMAL, trade_time TIMESTAMP) TYPE Kafka OPTIONS ( 'valueFormat' = 'json-flat', 'bootstrap.servers' = 'kafka:9092' );
|At the moment, the only supported streaming source for SQL is a Kafka mapping.|
You can query streams like you would query any batch source. The main difference between streaming and batch queries is that when you query a streaming source, Hazelcast creates a job, which continues to run until you cancel it or the client disconnects.
For example, you can monitor the whole
SELECT * FROM trades;
This query starts a job that monitors the
trades stream for new events and executes the query on new events.
The output contains all trades since the query was first executed. If no trades have yet been made, the output will be empty, however the query will not complete and the client will wait for more rows:
+------------+------------+-----+----------+ |trade_amount|trade_ticker|price|trade_time| +------------+------------+-----+----------+
You can also use other clauses such as the
WHERE clause to filter the output of the stream:
SELECT * FROM trades WHERE price > 10;
When new events are received, Hazelcast immediately returns the new rows from your active query. For example, the following query writes some trading data to the
INSERT INTO trades VALUES (100, 'ABCD', 11, '2022-01-01 00:00:00'), (20, 'EFGH', 1.5, '2022-01-01 00:01:00');
The result of the filter:
+------------+------------+-----+-------------------+ |trade_amount|trade_ticker|price|trade_time---------| +------------+------------+-----+-------------------+ | 100| 'ABCD'| 11|2022-01-01 00:00:00|
Hazelcast converts streaming queries into jobs, which are executed by the Jet engine. Streaming queries continue to run until the client disconnects or you explicitly close the query.
If the client stops receiving results, but doesn’t close the result, Hazelcast will fill up internal buffers and then the job will be blocked using backpressure. See Backpressure.
You can manage jobs backing SQL queries. For example, you may want to show all running jobs or cancel a query. See Managing Jobs.
An aggregate function performs computations over a group of rows and returns a single value. For example, the
COUNT(*) function returns the number of rows in the group. However, streams are unbounded, making it impossible to count all the rows because they are continuously being appended. To use aggregate functions over a stream, you need a way to group the rows into groups for which the engine can tell, when the group is complete.
A common way to group values is in relation to time, because a stream of events happening in real time is ordered by time. For example, you might want to perform some aggregation over all trades that happened in the past minute. When a next minute starts, Hazelcast can calculate the aggregation for the previous minute.
To assign events into time-based windows, you need to use a windowing function. Hazelcast supports the following windowing functions:
TUMBLE(): A tumbling window assigns events into non-overlapping windows of a fixed length.
HOP: A hopping window (sometimes also called sliding window), assigns events into overlapping windows of a fixed length. The 4th parameter of this function defines the hop size, i.e. the distance between the starts of two consecutive windows.
|For a conceptual introduction to windowing and watermarks, see Stream Processing and Event Timestamps.|
SELECT window_start, window_end, COUNT(*) AS total_trades (1) FROM TABLE(TUMBLE( TABLE trades_ordered, (2) DESCRIPTOR(trade_time), (3) INTERVAL '1' MINUTE)) (4) GROUP BY 1,2; (5)
|1||Get a count of all trades that happened in the window.|
|2||Use an ordered view of
|3||Use the timestamp in the
|4||Set the size of the tumbling window to one minute.|
|5||Defines the grouping, the numbers
When new results are complete for each one-minute window, they are returned:
+-------------------+-------------------+--------------------+ |window_start |window_end | total_trades| +-------------------+-------------------+--------------------+ |2022-01-04T00:00 |2022-01-04T00:01 | 45| ...
Hazelcast can’t emit the result of a windowed aggregation until it has received all the events belonging to the window. But streams typically aren’t strictly ordered by time, events arrive each with different latency. To tell Hazelcast how long to wait, you must define a watermark.
One way of assigning watermarks is defined by how much time an event is allowed to be delayed after the newest event received so far. This time is called the maximum event lag. Any event that is later than the maximum event lag is dropped. This is the only strategy currently supported.
|Time is measured by the timestamps in the events, rather than the current time on a system clock.|
Hazelcast calls the function to add watermarks
IMPOSE_ORDER(), because it curbs the potentially unbounded disorder of the events in the stream to a fixed value. The
IMPOSE_ORDER() function is a stateful function whose state is scoped for the duration of the query. This function injects watermarks that lag a fixed amount behind the maximum value of the field observed since the query started.
SELECT * FROM TABLE(IMPOSE_ORDER( TABLE trades, (1) DESCRIPTOR(trade_time), (2) INTERVAL '0.5' SECONDS) (3) );
|1||The table that contains the events, including the timestamp.|
|2||A pointer to the column that contains the timestamp for the watermark.|
|3||The maximum event lag. Any events that are later than this lag are dropped. For example, an event with a timestamp of
If an event is delayed by more than the defined maximum event lag, it is dropped.
The above query doesn’t currently run in Hazelcast. The
For better readability, it’s useful to create a view for the watermark like so:
CREATE VIEW trades_ordered AS SELECT * FROM TABLE(IMPOSE_ORDER( TABLE trades, DESCRIPTOR(trade_time), INTERVAL '0.5' SECONDS) );
We already used this view above. Without the view, you would have to have a nested call to
IMPOSE_ORDER as the first argument to
HOP function, which is harder to read.
Get started with streaming queries in SQL with a quick tutorial.