Stream Processing in SQL
Stream processing is a programming paradigm for continuously performing computations on data events as they arrive. The streams are never-ending, which is why they are often described as unbounded.
You can run SQL queries on single streams, join a stream with a table, or join two or more streams.
Context
This guide explains how to query streams, using the following e-commerce data in examples:
Field | Data type |
---|---|
|
|
|
|
|
|
|
|
|
|
Field | Data type |
---|---|
|
|
|
|
|
|
Creating a Mapping to a Streaming Source
To allow Hazelcast to read data from a streaming source, you need to create a mapping to it. For example, the mapping for the orders
Kafka topic might look like this:
CREATE OR REPLACE MAPPING orders (
order_amount BIGINT,
order_id BIGINT,
unit_price DECIMAL,
order_time TIMESTAMP)
TYPE Kafka
OPTIONS (
'valueFormat' = 'json-flat',
'bootstrap.servers' = 'kafka:9092'
);
At the moment, the only supported streaming source for SQL is a Kafka mapping. |
Basic Continuous Queries on a Stream
You can query streams like you would query any batch source. The main difference between streaming and batch queries is that when you query a streaming source, Hazelcast creates a job, which continues to run until you cancel it or the client disconnects.
For example, you can monitor the whole orders
stream:
SELECT * FROM orders;
This query starts a job that monitors the orders
stream for new events and executes the query on new events.
The output contains all orders since the query was first executed. If no orders have been submitted, the output is empty. However, the query will continue to run as orders are processed, and the client waits for more row entries:
+------------+--------+----------+----------+
|order_amount|order_id|unit_price|order_time|
+------------+--------+----------+----------+
You can also use other SQL clauses to extract the data that you need, such as the WHERE
clause to filter the output of the stream:
SELECT * FROM orders
WHERE unit_price > 10;
When new events are received, Hazelcast immediately adds new rows from your active query. For example, the following query writes order details to the orders
topic:
INSERT INTO orders (order_amount, order_id, unit_price) VALUES
(100, 1111, 11, '2022-01-01 00:00:00'),
(20, 1112, 1.5, '2022-01-01 00:01:00');
The result of the filter:
+------------+--------+----------+-------------------+
|order_amount|order_id|unit_price|order_time |
+------------+--------+----------+-------------------+
| 100| '1111'| 11|2022-01-01 00:00:00|
Managing Streaming Queries
Hazelcast converts streaming queries into jobs, which are executed by the Jet engine. Streaming queries continue to run until the client disconnects or you explicitly close the query.
If the client stops receiving results, but doesn’t close the result, Hazelcast will fill up internal buffers and then the job will be blocked using backpressure. See Backpressure.
You can manage jobs backing SQL queries. For example, you may want to show all running jobs or cancel a query. See Managing Jobs.
Windowed Aggregations
Unlike batch processing, working with data streams means dealing with a potentially unlimited amount of data. Windowed aggregation makes it possible to group stream events into smaller, finite batches for processing. This approach means that you can aggregate and perform computations on events from one or more data streams.
Hazelcast supports two types of time-based windows:
Tumbling Windows
A tumbling window assigns events into non-overlapping windows of a fixed length.
For example, you might want to find out the number of customer orders on each day. You can do this by assigning a one-day tumbling window to your orders stream. Order events are allocated to the one-day windows as they arrive based on the event timestamp.
The following SQL statement assigns the one-day window and outputs a count of the number of customer orders in each window:
SELECT window_start, window_end, COUNT(*) AS total_orders (1)
FROM TABLE(TUMBLE(
TABLE orders_ordered, (2)
DESCRIPTOR(order_time), (3)
INTERVAL '1' DAY)) (4)
GROUP BY 1,2; (5)
1 | Get a count of all orders that were submitted in the window. |
2 | Handle late events. |
3 | Use the timestamp in the order_time column to determine the window the event belongs to. |
4 | Set the size of the tumbling window to one day. |
5 | Defines the grouping, the numbers 1 and 2 refer to 1st and 2nd column of the SELECT clause, therefore you group by the window_start and window_end columns. |
New results for each one-day window are only returned when all events that belong to the window have been processed. To learn more about this, see Handling Late Events.
+-------------------+-------------------+--------------------+
|window_start |window_end | total_orders|
+-------------------+-------------------+--------------------+
|2022-01-04T00:00 |2022-01-04T23:59 | 5|
Hopping Windows
Hopping windows also allow you to group events by time. However, hopping windows can overlap. They’re useful for taking a snapshot of data.
For example, let’s say that you want to find out if your company is meeting their seven-day target for deliveries, and you want to take a snapshot of this data every day. In this case, you define the size of the window, and the interval, or step
, between windows.
The following SQL statement assigns the hopping window, and outputs a count of the number of deliveries in each window:
SELECT window_start, window_end, COUNT(*) AS total_deliveries (1)
FROM TABLE(HOP(
TABLE deliveries_ordered, (2)
DESCRIPTOR(delivery_time), (3)
INTERVAL '1' DAY, INTERVAL '7' DAY)) (4)
GROUP BY 1,2; (5)
1 | Get a count of all deliveries in the window. |
2 | Handle late events. |
3 | Use the timestamp in the delivery_time column to determine the window the event belongs to. |
4 | Set the size of the step to one day and the window to seven days. |
5 | Defines the grouping, the numbers 1 and 2 refer to 1st and 2nd column of the SELECT clause, therefore you group by the window_start and window_end columns. |
+-------------------+-------------------+--------------------+
|window_start |window_end | total_deliveries|
+-------------------+-------------------+--------------------+
|2022-01-04T00:00 |2022-01-10T23:59 | 13|
|2022-01-05T00:00 |2022-01-11T23:59 | 12|
|2022-01-06T00:00 |2022-01-12T23:59 | 13|
New results for each seven-day window are only returned when all events that belong to the window have been processed. To learn more about this, see Handling Late Events.
Handling Late Events
As stated in the previous section, Hazelcast can’t emit the results of windowed aggregations or stream-to-stream joins until it has received all the events belonging to the defined timeframe. However, due to differences in latency, events that fall within the timeframe may not arrive for processing until after it ends. To place a limit on late events, Hazelcast uses the IMPOSE_ORDER()
function. This function allows you to specify a maximum event lag. Any event that arrives later than the maximum event lag is dropped.
Time is measured by the timestamps in the events, rather than the current time on a system clock. |
In the following example, the IMPOSE_ORDER()
function injects a maximum event lag of 0.5 seconds for each aggregation window. An order event with a timestamp of yyyy-mm-dd 23:59:59.5
is added to the window. If another event is processed with a timestamp that’s 0.5 seconds or more old, such as `yyyy-mm-dd 23:59:58.9
, that event is dropped because it is too old.
SELECT *
FROM TABLE(IMPOSE_ORDER(
TABLE orders, (1)
DESCRIPTOR(order_time), (2)
INTERVAL '0.5' SECONDS) (3)
);
1 | The table that contains the events, including the timestamp. |
2 | A pointer to the column that contains the timestamp for the watermark. |
3 | The maximum event lag. |
As good practice, and for better readability, always create a view:
CREATE VIEW orders_ordered AS
SELECT *
FROM TABLE(IMPOSE_ORDER(
TABLE orders,
DESCRIPTOR(order_time),
INTERVAL '0.5' SECONDS)
);
Without the view, you would have to have a nested call to IMPOSE_ORDER
as the first argument to TUMBLE
/HOP
function, which is harder to read and more difficult to reuse.
Stream-To-Stream Joins
If you have two or more streams of related data, you can join them together on a related field, process them, and store the result.
The following examples show you how to merge data from an orders
and a deliveries
events stream and write this data to a single, aggregated view for querying.
Create Mappings
As for an individual data stream, you must start by creating a mapping for each Kafka topic that you want to use as a data source.
CREATE OR REPLACE MAPPING orders (
order_id BIGINT,
order_time TIMESTAMP
product_id BIGINT)
TYPE Kafka
OPTIONS (
'valueFormat' = 'json-flat',
'bootstrap.servers' = 'kafka:9092'
);
CREATE OR REPLACE MAPPING deliveries (
delivery_id BIGINT,
order_id VARCHAR
delivery_time TIMESTAMP
address TIMESTAMP )
TYPE Kafka
OPTIONS (
'valueFormat' = 'json-flat',
'bootstrap.servers' = 'kafka:9092'
);
Join the Streams
Next, you need to specify the timebound relationship between the two event streams. This is defined in a SQL SELECT
statement using a JOIN
. The mandatory timed conditions of the JOIN
tell the Jet engine how long to buffer the events from each event stream before processing them; the postpone time. Without this delay, the stream of events held in memory is potentially unlimited.
For example, you might want to know how many next-day deliveries are being made. The following SELECT
statement finds all deliveries that are made within one day of an order. In this case, the postpone time is defined by the one-day interval between orders and deliveries.
SELECT *
FROM orders_ordered AS os
JOIN deliveries_ordered AS do
ON do.delivery_time BETWEEN os.order_time
AND os.order_time + INTERVAL `1` DAY,
Write Merged Streams to an Aggregated View
You can use the IMPOSE_ORDER
function to write the results of the SELECT
statement straight to a view ready for querying.
CREATE VIEW orders_and_deliveries AS
SELECT *
FROM TABLE(IMPOSE_ORDER
(TABLE orders_ordered,
DESCRIPTOR(order_time),
INTERVAL '1' DAY))
AS os
JOIN deliveries_ordered AS do
ON do.delivery.time BETWEEN os.order_time
AND os.order_time + INTERVAL `1` DAY
Related Resources
-
Get started with streaming queries in SQL with a quick tutorial.