About Stream Processing Pipelines
In distributed systems such as Hazelcast, cluster members need to be able to deal with disruptions to data streams. For example, a sensor may lose connection and stop sending data for a few seconds. Because of these disruptions, it’s not guaranteed that data will arrive in the order it was created.
To deal with this problem, you can use timestamps to determine what to do with late events.
In an unbounded stream of events, the dimension of time is always there. To appreciate this, consider a bounded stream: it may represent a dataset labeled "Wednesday", but the computation itself doesn’t have to know this. Its results will be understood from the outside to be "about Wednesday":
An endless stream, on the other hand, delivers information about the reality as it is unfolding, in near-real time, and the computation itself must deal with time explicitly:
We represent the reality in digital form as a stream of events. Most importantly, every data item has a timestamp that tells us when the event occurred. All the processing logic must rely on these timestamps and not whatever the current time happens to be when running the computation. This brings us to these two concepts:
Event time: determined by the event’s timestamp
Processing time: the current time at the moment of processing an event
The difference between these two ways to account for time comes up often in the design of distributed streaming systems and to some extent you’ll have to deal with it directly.
In an ideal world, event time and processing time would be the same and events would be processed immediately. In reality this is far from true and there can be a significant difference between the two. The difference is also highly variable and is affected by factors like network congestion, shared resource limitations and many more. This results in what we call event disorder: observing the events out of their true order of occurrence.
Here’s what an ordered event stream looks like:
Notice that latency not only exists, but is variable. This has no major impact on stream processing.
And here’s what event disorder looks like:
Latency is all over the place now and it has disordered the events. Of
the five events shown, the second one processed is already the latest.
After processing it Hazelcast has no idea how much longer to wait expecting
events older than it. This is where you as the user are expected to
provide the maximum event lag. Hazelcast can’t emit the result of a windowed
aggregation until it has received all the events belonging to the
window, but the longer it waits, the later you’ll see the results. So
you must strike a balance and choose how much to wait. Notice that by
"wait" we mean event time, not processing time: when we get an event
t_a, we are no longer waiting for events with timestamp
t_b ⇐ t_a - maxLag.
With unbounded streams you need a policy that selects bounded chunks whose aggregate results you are interested in. This is called windowing. You can imagine the window as a time interval laid over the time axis. A given window contains only the events that belong to that interval.
Sliding window is probably the most natural kind of window: it slides along the time axis, trailing just behind the current time. In Hazelcast, the window doesn’t actually slide smoothly but in configured steps.
Sliding window aggregation is a great tool to discover the dynamic properties of your event stream. Quick example: say your event stream contains GPS location reports from millions of mobile users. With a few lines of code you can split the stream into groups by user ID and apply a sliding window with linear regression to retrieve a smoothened velocity vector of each user. Applying the same kind of window the second time will give you acceleration vectors, and so on.
In Hazelcast, the tumbling window is just a special case of the sliding window. Since the sliding step is configurable, you can set it equal to the window itself. You can imagine the window tumbling over from one position to the next. Since Hazelcast has an optimized computation scheme for the sliding window, there is little reason not to use a sliding step finer than the size of the window. A rule of thumb is 10-100 steps per window.
While the sliding window has a fixed, predetermined length, the session window adapts to the data itself. When two consecutive events are separated by more than the configured timeout, that gap marks the boundary between the two windows. If there is no data, there is no session window, either.
When reading from a source with a
StreamStage type, the Jet API guides you to set up the timestamp policy, using one of the following methods:
withNativeTimestamps(): Declares that the stream will use the source’s native timestamps. This typically refers to the timestamps that the external source system sets on each event.
withTimestamps(timestampFn): Passes a function to the source to determine the timestamp of each event.
withIngestionTimestamps(): Declares that the source will assign the time of ingestion as the event timestamp, using the system clock on the member machine.
withoutTimestamps(): Declares that the source stage has no timestamps. Use this option if you your pipeline won’t perform windowed aggregation or stateful mapping.
You may also have to start without timestamps, then perform a
transformation, and only then use
instruct Hazelcast where to find them in the transformed events. Some examples
include an enrichment stage that retrieves the timestamps from a side
input or flat-mapping the stream to unpack a series of events from each
original item. If you do this, however, you will remove the watermarking
responsibility from the source.
In some sources, especially partitioned ones like Kafka,
there is a risk of high event time skew
occurring across partitions as Hazelcast pulls the data from
them. This problem is especially pronounced when Hazelcast recovers from a
failure and restarts your job. In this case Hazelcast must catch up with all
the events that arrived since taking the last snapshot. It will receive
these events at the maximum system throughput and thus each partition
will have a lot of data each time Hazelcast polls it. This means that the
interleaving of data from different partitions will become much more
coarse-grained and there will be sudden jumps in event time at the
points of transition from one partition to the next. The jumps can
easily exceed the configured
allowedLag and cause Hazelcast to drop whole
swaths of events as late.
In order to mitigate this issue, Hazelcast has special logic in its partitioned source implementations that keeps separate track of the timestamps within each partition and knows how to reconcile the temporary differences that occur between them.
This feature works only if you set the timestamping policy in the source
If the time is extracted from the events, time progresses only when newer events arrive. If the events are sparse, the time will effectively stop between two events. This causes high latency for time-sensitive operations (such as window aggregation). The time is also tracked for every source partition separately and if just one partition has sparse events, time progress in the whole job is hindered.
To overcome this you can either ensure there’s a consistent influx of
events in every partition, or you can use
which doesn’t have this issue because it’s based on system clock on the
By default Hazelcast prefers parallel throughput over strict event ordering.
Many transforms aren’t sensitive to the exact order of events. This
includes the stateless transforms, as well as aggregate operations.
There are also transforms, especially
mapStateful, where it’s much
easier to write the logic if you can rely on the strict order of events.
A common example of this is recognizing patterns in the event sequence and other tasks commonly done in the discipline of Complex Event Processing. Also, external services that a pipeline interacts with can be stateful, and their state can also be order dependent.
For those cases,
Pipeline has a property named
preserveOrder. If you
enable it, Hazelcast will keep the order of events with the same partitioning
key at the expense of less flexible balancing of parallel processing
tasks. You can enable it this way:
Pipeline p = Pipeline.create(); p.setPreserveOrder(true);
Note that a given pipeline may still reorder events. This happens
whenever you change the partitioning key along a data path. For example,
if you receive data from a partitioned source like Kafka, but then use
groupingKey which doesn’t match the Kafka partitioning key, it
means you have changed the partitioning key and the original order is
For a discussion of the underlying mechanisms of this feature, see Pipeline Execution Model.
Learn how to submit jobs to a Hazelcast member.