A newer version of Hazelcast Platform is available.

View latest

About Stream Processing Pipelines

In distributed systems such as Hazelcast, cluster members need to be able to deal with disruptions to data streams. For example, a sensor may lose connection and stop sending data for a few seconds. Because of these disruptions, it’s not guaranteed that data will arrive in the order it was created.

To deal with this problem, you can use timestamps to determine what to do with late events.

Event Timestamps

In an unbounded stream of events, the dimension of time is always there. To appreciate this, consider a bounded stream: it may represent a dataset labeled "Wednesday", but the computation itself doesn’t have to know this. Its results will be understood from the outside to be "about Wednesday":

Daily reports for Monday

An endless stream, on the other hand, delivers information about the reality as it is unfolding, in near-real time, and the computation itself must deal with time explicitly:

Difference between when a report was requested and when it was created

Event Time and Processing Time

We represent the reality in digital form as a stream of events. Most importantly, every data item has a timestamp that tells us when the event occurred. All the processing logic must rely on these timestamps and not whatever the current time happens to be when running the computation. This brings us to these two concepts:

  • Event time: determined by the event’s timestamp

  • Processing time: the current time at the moment of processing an event

An event happens at 08:12 and is processed at 08:13

The difference between these two ways to account for time comes up often in the design of distributed streaming systems and to some extent you’ll have to deal with it directly.

Event Disorder

In an ideal world, event time and processing time would be the same and events would be processed immediately. In reality this is far from true and there can be a significant difference between the two. The difference is also highly variable and is affected by factors like network congestion, shared resource limitations and many more. This results in what we call event disorder: observing the events out of their true order of occurrence.

Here’s what an ordered event stream looks like:

An ordered stream of events where the oldest event is processed first and the latest event is processed last

Notice that latency not only exists, but is variable. This has no major impact on stream processing.

And here’s what event disorder looks like:

A disordered stream of events where the oldest event is processed last due to latency

Latency is all over the place now and it has disordered the events. Of the five events shown, the second one processed is already the latest. After processing it, Hazelcast has no idea how much longer to wait expecting events older than it. This is where you as the user are expected to provide the maximum event lag. Hazelcast can’t emit the result of a windowed aggregation until it has received all the events belonging to the window, but the longer it waits, the later you’ll see the results. So you must strike a balance and choose how much to wait. Notice that by "wait" we mean event time, not processing time: when we get an event with timestamp t_a, we are no longer waiting for events with timestamp t_b <= t_a - maxLag.

Time Windowing

With unbounded streams you need a policy that selects bounded chunks whose aggregate results you are interested in. This is called windowing. You can imagine the window as a time interval laid over the time axis. A given window contains only the events that belong to that interval.

Sliding Window

A time interval laid over the time axis

Sliding window is probably the most natural kind of window: it slides along the time axis, trailing just behind the current time. In Hazelcast, the window doesn’t actually slide smoothly but in configured steps.

Sliding window aggregation is a great tool to discover the dynamic properties of your event stream. Quick example: say your event stream contains GPS location reports from millions of mobile users. With a few lines of code you can split the stream into groups by user ID and apply a sliding window with linear regression to retrieve a smoothened velocity vector of each user. Applying the same kind of window the second time will give you acceleration vectors, and so on.

Tumbling Window

A tumbling window

In Hazelcast, the tumbling window is just a special case of the sliding window. Since the sliding step is configurable, you can set it equal to the window itself. You can imagine the window tumbling over from one position to the next. Since Hazelcast has an optimized computation scheme for the sliding window, there is little reason not to use a sliding step finer than the size of the window. A rule of thumb is 10-100 steps per window.

Session Window

Two data sets separated by time windows

While the sliding window has a fixed, predetermined length, the session window adapts to the data itself. When two consecutive events are separated by more than the configured timeout, that gap marks the boundary between the two windows. If there is no data, there is no session window, either.

Adding Timestamps to a Streaming Job

When reading from a source with a StreamStage type, the Jet API guides you to set up the timestamp policy, using one of the following methods:

  • withNativeTimestamps(): Declares that the stream will use the source’s native timestamps. This typically refers to the timestamps that the external source system sets on each event.

  • withTimestamps(timestampFn): Passes a function to the source to determine the timestamp of each event.

  • withIngestionTimestamps(): Declares that the source will assign the time of ingestion as the event timestamp, using the system clock on the member machine.

  • withoutTimestamps(): Declares that the source stage has no timestamps. Use this option if you your pipeline won’t perform windowed aggregation or stateful mapping.

You may also have to start without timestamps, then perform a transformation, and only then use addTimestamps(timestampFn) to instruct Hazelcast where to find them in the transformed events. Some examples include an enrichment stage that retrieves the timestamps from a side input or flat-mapping the stream to unpack a series of events from each original item. If you do this, however, you will remove the watermarking responsibility from the source.

Assigning Timestamps at the Source

In some sources, especially partitioned ones like Kafka, there is a risk of high event time skew occurring across partitions as Hazelcast pulls the data from them. This problem is especially pronounced when Hazelcast recovers from a failure and restarts your job. In this case Hazelcast must catch up with all the events that arrived since taking the last snapshot. It will receive these events at the maximum system throughput and thus each partition will have a lot of data each time Hazelcast polls it. This means that the interleaving of data from different partitions will become much more coarse-grained and there will be sudden jumps in event time at the points of transition from one partition to the next. The jumps can easily exceed the configured allowedLag and cause Hazelcast to drop whole swaths of events as late.

In order to mitigate this issue, Hazelcast has special logic in its partitioned source implementations that keeps separate track of the timestamps within each partition and knows how to reconcile the temporary differences that occur between them.

This feature works only if you set the timestamping policy in the source using withTimestamps() or withNativeTimestamps().

Dealing with Sparse Events

If the time is extracted from the events, time progresses only when newer events arrive. If the events are sparse, the time will effectively stop between two events. This causes high latency for time-sensitive operations (such as window aggregation). The time is also tracked for every source partition separately and if just one partition has sparse events, time progress in the whole job is hindered.

To overcome this you can either ensure there’s a consistent influx of events in every partition, or you can use withIngestionTimestamps() which doesn’t have this issue because it’s based on system clock on the member machines.

Dealing with Out-Of-Order Events

By default, Hazelcast prefers parallel throughput over strict event ordering. Many transforms aren’t sensitive to the exact order of events. This includes the stateless transforms, as well as aggregate operations. There are also transforms, especially mapStateful, where it’s much easier to write the logic if you can rely on the strict order of events.

A common example of this is recognizing patterns in the event sequence and other tasks commonly done in the discipline of Complex Event Processing. Also, external services that a pipeline interacts with can be stateful, and their state can also be order dependent.

For those cases, the Pipeline object has a property named preserveOrder. If you enable it, Hazelcast will keep the order of events with the same partitioning key at the expense of less flexible balancing of parallel processing tasks.

Pipeline p = Pipeline.create();
p.setPreserveOrder(true);

Note that a given pipeline may still reorder events. This happens whenever you change the partitioning key along a data path. For example, if you receive data from a partitioned source like Kafka, but then use a groupingKey which doesn’t match the Kafka partitioning key, you have changed the partitioning key and the original order is lost. For a discussion of the underlying mechanisms of this feature, see the Pipeline Execution Model.

Next Steps

Learn how to submit jobs to a Hazelcast member.