Stream Processing in SQL
Stream processing is a programming paradigm for continuously performing computations on data events as they arrive. The streams are never-ending, which is why they are often described as unbounded.
You can run SQL queries on single streams, join a stream with a table, or join two or more streams.
This guide explains how to query streams, using the following e-commerce data in examples:
To allow Hazelcast to read data from a streaming source, you need to create a mapping to it. For example, the mapping for the
orders Kafka topic might look like this:
CREATE OR REPLACE MAPPING orders ( order_amount BIGINT, order_id BIGINT, unit_price DECIMAL, order_time TIMESTAMP) TYPE Kafka OPTIONS ( 'valueFormat' = 'json-flat', 'bootstrap.servers' = 'kafka:9092' );
|At the moment, the only supported streaming source for SQL is a Kafka mapping.|
You can query streams like you would query any batch source. The main difference between streaming and batch queries is that when you query a streaming source, Hazelcast creates a job, which continues to run until you cancel it or the client disconnects.
For example, you can monitor the whole
SELECT * FROM orders;
This query starts a job that monitors the
orders stream for new events and executes the query on new events.
The output contains all orders since the query was first executed. If no orders have been submitted, the output is empty. However, the query will continue to run as orders are processed, and the client waits for more row entries:
+------------+--------+----------+----------+ |order_amount|order_id|unit_price|order_time| +------------+--------+----------+----------+
You can also use other SQL clauses to extract the data that you need, such as the
WHERE clause to filter the output of the stream:
SELECT * FROM orders WHERE unit_price > 10;
When new events are received, Hazelcast immediately adds new rows from your active query. For example, the following query writes order details to the
INSERT INTO orders (order_amount, order_id, unit_price, order_time) VALUES (100, 1111, 11, '2022-01-01 00:00:00'), (20, 1112, 1.5, '2022-01-01 00:01:00');
The result of the filter:
+------------+--------+----------+-------------------+ |order_amount|order_id|unit_price|order_time | +------------+--------+----------+-------------------+ | 100| '1111'| 11|2022-01-01 00:00:00|
Hazelcast converts streaming queries into jobs, which are executed by the Jet engine. Streaming queries continue to run until the client disconnects or you explicitly close the query.
If the client stops receiving results, but doesn’t close the result, Hazelcast will fill up internal buffers and then the job will be blocked using backpressure. See Backpressure.
You can manage jobs backing SQL queries. For example, you may want to show all running jobs or cancel a query. See Managing Jobs.
Unlike batch processing, working with data streams means dealing with a potentially unlimited amount of data. Windowed aggregation makes it possible to group stream events into smaller, finite batches for processing. This approach means that you can aggregate and perform computations on events from one or more data streams.
Hazelcast supports two types of time-based windows:
A tumbling window assigns events into non-overlapping windows of a fixed length.
For example, you might want to find out the number of customer orders on each day. You can do this by assigning a one-day tumbling window to your orders stream. Order events are allocated to the one-day windows as they arrive based on the event timestamp.
The following SQL statement assigns the one-day window and outputs a count of the number of customer orders in each window:
SELECT window_start, window_end, COUNT(*) AS total_orders (1) FROM TABLE(TUMBLE( TABLE orders_ordered, (2) DESCRIPTOR(order_time), (3) INTERVAL '1' DAY)) (4) GROUP BY 1,2; (5)
|1||Get a count of all orders that were submitted in the window.|
|2||Handle late events.|
|3||Use the timestamp in the
|4||Set the size of the tumbling window to one day.|
|5||Defines the grouping, the numbers
New results for each one-day window are only returned when all events that belong to the window have been processed. To learn more about this, see Handling Late Events.
+-------------------+-------------------+--------------------+ |window_start |window_end | total_orders| +-------------------+-------------------+--------------------+ |2022-01-04T00:00 |2022-01-04T23:59 | 5|
Hopping windows also allow you to group events by time. However, hopping windows can overlap. They’re useful for taking a snapshot of data.
For example, let’s say that you want to find out if your company is meeting their seven-day target for deliveries, and you want to take a snapshot of this data every day. In this case, you define the size of the window, and the interval, or
step, between windows.
The following SQL statement assigns the hopping window, and outputs a count of the number of deliveries in each window:
SELECT window_start, window_end, COUNT(*) AS total_deliveries (1) FROM TABLE(HOP( TABLE deliveries_ordered, (2) DESCRIPTOR(delivery_time), (3) INTERVAL '1' DAY, INTERVAL '7' DAY)) (4) GROUP BY 1,2; (5)
|1||Get a count of all deliveries in the window.|
|2||Handle late events.|
|3||Use the timestamp in the
|4||Set the size of the step to one day and the window to seven days.|
|5||Defines the grouping, the numbers
+-------------------+-------------------+--------------------+ |window_start |window_end | total_deliveries| +-------------------+-------------------+--------------------+ |2022-01-04T00:00 |2022-01-10T23:59 | 13| |2022-01-05T00:00 |2022-01-11T23:59 | 12| |2022-01-06T00:00 |2022-01-12T23:59 | 13|
New results for each seven-day window are only returned when all events that belong to the window have been processed. To learn more about this, see Handling Late Events.
As stated in the previous section, Hazelcast can’t emit the results of windowed aggregations or stream-to-stream joins until it has received all the events belonging to the defined timeframe. However, due to differences in latency, events that fall within the timeframe may not arrive for processing until after it ends. To place a limit on late events, Hazelcast uses the
IMPOSE_ORDER() function. This function allows you to specify a maximum event lag. Any event that arrives later than the maximum event lag is dropped.
|Time is measured by the timestamps in the events, rather than the current time on a system clock.|
In the following example, the
IMPOSE_ORDER() function injects a maximum event lag of 0.5 seconds for each aggregation window. An order event with a timestamp of
yyyy-mm-dd 23:59:59.5 is added to the window. If another event is processed with a timestamp that’s 0.5 seconds or more old, such as
`yyyy-mm-dd 23:59:58.9, that event is dropped because it is too old.
SELECT * FROM TABLE(IMPOSE_ORDER( TABLE orders, (1) DESCRIPTOR(order_time), (2) INTERVAL '0.5' SECONDS) (3) );
|1||The table that contains the events, including the timestamp.|
|2||A pointer to the column that contains the timestamp for the watermark.|
|3||The maximum event lag.|
As good practice, and for better readability, always create a view:
CREATE VIEW orders_ordered AS SELECT * FROM TABLE(IMPOSE_ORDER( TABLE orders, DESCRIPTOR(order_time), INTERVAL '0.5' SECONDS) );
Without the view, you would have to have a nested call to
IMPOSE_ORDER as the first argument to
HOP function, which is harder to read and more difficult to reuse.
If you have two or more streams of related data, you can join them together on a related field, process them, and store the result.
The following examples show you how to merge data from an
orders and a
deliveries events stream and write this data to a single, aggregated view for querying.
As for an individual data stream, you must start by creating a mapping for each Kafka topic that you want to use as a data source.
CREATE OR REPLACE MAPPING orders ( order_id BIGINT, order_time TIMESTAMP product_id BIGINT) TYPE Kafka OPTIONS ( 'valueFormat' = 'json-flat', 'bootstrap.servers' = 'kafka:9092' );
CREATE OR REPLACE MAPPING deliveries ( delivery_id BIGINT, order_id VARCHAR delivery_time TIMESTAMP address TIMESTAMP ) TYPE Kafka OPTIONS ( 'valueFormat' = 'json-flat', 'bootstrap.servers' = 'kafka:9092' );
Next, you need to specify the timebound relationship between the two event streams. This is defined in a SQL
SELECT statement using a
JOIN. The mandatory timed conditions of the
JOIN tell the Jet engine how long to buffer the events from each event stream before processing them; the postpone time. Without this delay, the stream of events held in memory is potentially unlimited.
For example, you might want to know how many next-day deliveries are being made. The following
SELECT statement finds all deliveries that are made within one day of an order. In this case, the postpone time is defined by the one-day interval between orders and deliveries.
SELECT * FROM orders_ordered AS os JOIN deliveries_ordered AS do ON do.delivery_time BETWEEN os.order_time AND os.order_time + INTERVAL `1` DAY,
You can use the
IMPOSE_ORDER function to write the results of the
SELECT statement straight to a view ready for querying.
CREATE VIEW orders_and_deliveries AS SELECT * FROM TABLE(IMPOSE_ORDER (TABLE orders_ordered, DESCRIPTOR(order_time), INTERVAL '1' DAY)) AS os JOIN deliveries_ordered AS do ON do.delivery.time BETWEEN os.order_time AND os.order_time + INTERVAL `1` DAY
Get started with streaming queries in SQL with a quick tutorial.