Stream Processing in SQL

Stream processing is a programming paradigm for continuously performing computations on data events as they arrive. The streams are never-ending, which is why they are often described as unbounded.

You can run SQL queries on single streams, join a stream with a table, or join two or more streams.

Context

This guide explains how to query streams, using the following e-commerce data in examples:

Table 1. Orders
Field Data type

order_amount

BIGINT

order_id

BIGINT

order_time

TIMESTAMP

product_id

BIGINT

unit_price

DECIMAL

Table 2. Deliveries
Field Data type

address

VARCHAR

delivery_id

BIGINT

delivery_time

TIMESTAMP

Creating a Mapping to a Streaming Source

To allow Hazelcast to read data from a streaming source, you need to create a mapping to it. For example, the mapping for the orders Kafka topic might look like this:

CREATE OR REPLACE MAPPING orders (
  order_amount BIGINT,
  order_id BIGINT,
  unit_price DECIMAL,
  order_time TIMESTAMP)
TYPE Kafka
OPTIONS (
  'valueFormat' = 'json-flat',
  'bootstrap.servers' = 'kafka:9092'
);
At the moment, the only supported streaming source for SQL is a Kafka mapping.

Basic Continuous Queries on a Stream

You can query streams like you would query any batch source. The main difference between streaming and batch queries is that when you query a streaming source, Hazelcast creates a job, which continues to run until you cancel it or the client disconnects.

For example, you can monitor the whole orders stream:

SELECT * FROM orders;

This query starts a job that monitors the orders stream for new events and executes the query on new events.

The output contains all orders since the query was first executed. If no orders have been submitted, the output is empty. However, the query will continue to run as orders are processed, and the client waits for more row entries:

+------------+--------+----------+----------+
|order_amount|order_id|unit_price|order_time|
+------------+--------+----------+----------+

You can also use other SQL clauses to extract the data that you need, such as the WHERE clause to filter the output of the stream:

SELECT * FROM orders
WHERE unit_price > 10;

When new events are received, Hazelcast immediately adds new rows from your active query. For example, the following query writes order details to the orders topic:

INSERT INTO orders (order_amount, order_id, unit_price) VALUES
  (100, 1111, 11, '2022-01-01 00:00:00'),
  (20, 1112, 1.5, '2022-01-01 00:01:00');

The result of the filter:

+------------+--------+----------+-------------------+
|order_amount|order_id|unit_price|order_time         |
+------------+--------+----------+-------------------+
|         100|  '1111'|        11|2022-01-01 00:00:00|

Managing Streaming Queries

Hazelcast converts streaming queries into jobs, which are executed by the Jet engine. Streaming queries continue to run until the client disconnects or you explicitly close the query.

If the client stops receiving results, but doesn’t close the result, Hazelcast will fill up internal buffers and then the job will be blocked using backpressure. See Backpressure.

You can manage jobs backing SQL queries. For example, you may want to show all running jobs or cancel a query. See Managing Jobs.

Windowed Aggregations

Unlike batch processing, working with data streams means dealing with a potentially unlimited amount of data. Windowed aggregation makes it possible to group stream events into smaller, finite batches for processing. This approach means that you can aggregate and perform computations on events from one or more data streams.

Hazelcast supports two types of time-based windows:

Tumbling Windows

A tumbling window assigns events into non-overlapping windows of a fixed length.

For example, you might want to find out the number of customer orders on each day. You can do this by assigning a one-day tumbling window to your orders stream. Order events are allocated to the one-day windows as they arrive based on the event timestamp.

A tumbling window

The following SQL statement assigns the one-day window and outputs a count of the number of customer orders in each window:

SELECT window_start, window_end, COUNT(*) AS total_orders (1)
FROM TABLE(TUMBLE(
  TABLE orders_ordered, (2)
  DESCRIPTOR(order_time), (3)
  INTERVAL '1' DAY)) (4)
GROUP BY 1,2; (5)
1 Get a count of all orders that were submitted in the window.
2 Handle late events.
3 Use the timestamp in the order_time column to determine the window the event belongs to.
4 Set the size of the tumbling window to one day.
5 Defines the grouping, the numbers 1 and 2 refer to 1st and 2nd column of the SELECT clause, therefore you group by the window_start and window_end columns.

New results for each one-day window are only returned when all events that belong to the window have been processed. To learn more about this, see Handling Late Events.

+-------------------+-------------------+--------------------+
|window_start       |window_end         |        total_orders|
+-------------------+-------------------+--------------------+
|2022-01-04T00:00   |2022-01-04T23:59   |                   5|

Hopping Windows

Hopping windows also allow you to group events by time. However, hopping windows can overlap. They’re useful for taking a snapshot of data.

For example, let’s say that you want to find out if your company is meeting their seven-day target for deliveries, and you want to take a snapshot of this data every day. In this case, you define the size of the window, and the interval, or step, between windows.

A hopping window

The following SQL statement assigns the hopping window, and outputs a count of the number of deliveries in each window:

SELECT window_start, window_end, COUNT(*) AS total_deliveries (1)
FROM TABLE(HOP(
  TABLE deliveries_ordered, (2)
  DESCRIPTOR(delivery_time), (3)
  INTERVAL '1' DAY, INTERVAL '7' DAY)) (4)
GROUP BY 1,2; (5)
1 Get a count of all deliveries in the window.
2 Handle late events.
3 Use the timestamp in the delivery_time column to determine the window the event belongs to.
4 Set the size of the step to one day and the window to seven days.
5 Defines the grouping, the numbers 1 and 2 refer to 1st and 2nd column of the SELECT clause, therefore you group by the window_start and window_end columns.
+-------------------+-------------------+--------------------+
|window_start       |window_end         |    total_deliveries|
+-------------------+-------------------+--------------------+
|2022-01-04T00:00   |2022-01-10T23:59   |                  13|
|2022-01-05T00:00   |2022-01-11T23:59   |                  12|
|2022-01-06T00:00   |2022-01-12T23:59   |                  13|

New results for each seven-day window are only returned when all events that belong to the window have been processed. To learn more about this, see Handling Late Events.

Handling Late Events

As stated in the previous section, Hazelcast can’t emit the results of windowed aggregations or stream-to-stream joins until it has received all the events belonging to the defined timeframe. However, due to differences in latency, events that fall within the timeframe may not arrive for processing until after it ends. To place a limit on late events, Hazelcast uses the IMPOSE_ORDER() function. This function allows you to specify a maximum event lag. Any event that arrives later than the maximum event lag is dropped.

Time is measured by the timestamps in the events, rather than the current time on a system clock.

In the following example, the IMPOSE_ORDER() function injects a maximum event lag of 0.5 seconds for each aggregation window. An order event with a timestamp of yyyy-mm-dd 23:59:59.5 is added to the window. If another event is processed with a timestamp that’s 0.5 seconds or more old, such as `yyyy-mm-dd 23:59:58.9, that event is dropped because it is too old.

SELECT *
FROM TABLE(IMPOSE_ORDER(
  TABLE orders, (1)
  DESCRIPTOR(order_time), (2)
  INTERVAL '0.5' SECONDS) (3)
);
1 The table that contains the events, including the timestamp.
2 A pointer to the column that contains the timestamp for the watermark.
3 The maximum event lag.

As good practice, and for better readability, always create a view:

CREATE VIEW orders_ordered AS
SELECT *
  FROM TABLE(IMPOSE_ORDER(
  TABLE orders,
  DESCRIPTOR(order_time),
  INTERVAL '0.5' SECONDS)
);

Without the view, you would have to have a nested call to IMPOSE_ORDER as the first argument to TUMBLE/HOP function, which is harder to read and more difficult to reuse.

Stream-To-Stream Joins

If you have two or more streams of related data, you can join them together on a related field, process them, and store the result.

The following examples show you how to merge data from an orders and a deliveries events stream and write this data to a single, aggregated view for querying.

Create Mappings

As for an individual data stream, you must start by creating a mapping for each Kafka topic that you want to use as a data source.

CREATE OR REPLACE MAPPING orders (
  order_id BIGINT,
  order_time TIMESTAMP
  product_id BIGINT)
TYPE Kafka
OPTIONS (
  'valueFormat' = 'json-flat',
  'bootstrap.servers' = 'kafka:9092'
);
CREATE OR REPLACE MAPPING deliveries (
  delivery_id BIGINT,
  order_id VARCHAR
  delivery_time TIMESTAMP
  address TIMESTAMP )
TYPE Kafka
OPTIONS (
  'valueFormat' = 'json-flat',
  'bootstrap.servers' = 'kafka:9092'
);

Join the Streams

Next, you need to specify the timebound relationship between the two event streams. This is defined in a SQL SELECT statement using a JOIN. The mandatory timed conditions of the JOIN tell the Jet engine how long to buffer the events from each event stream before processing them; the postpone time. Without this delay, the stream of events held in memory is potentially unlimited.

For example, you might want to know how many next-day deliveries are being made. The following SELECT statement finds all deliveries that are made within one day of an order. In this case, the postpone time is defined by the one-day interval between orders and deliveries.

SELECT *
  FROM orders_ordered AS os
  JOIN deliveries_ordered AS do
    ON do.delivery_time BETWEEN os.order_time
    AND os.order_time + INTERVAL `1` DAY,

Write Merged Streams to an Aggregated View

You can use the IMPOSE_ORDER function to write the results of the SELECT statement straight to a view ready for querying.

CREATE VIEW orders_and_deliveries AS
SELECT *
  FROM TABLE(IMPOSE_ORDER
  (TABLE orders_ordered,
  DESCRIPTOR(order_time),
  INTERVAL '1' DAY))
  AS os
  JOIN deliveries_ordered AS do
    ON do.delivery.time BETWEEN os.order_time
    AND os.order_time + INTERVAL `1` DAY