This is a prerelease version.

View latest

Jet: How Hazelcast Models and Executes Jobs

Hazelcast models your pipeline code into a directed acyclic graph (DAG) which consists of stages. Each processing stage accepts the events from upstream stages, processes them, and passes the results to the downstream stage. To run a job, Hazelcast transforms the pipeline DAG into the core DAG. The top-level component that does this is called the planner.

The Word Count Task

Let’s take one specific problem, the Word Count, and explain how to model it as a DAG. In this task we analyze the input consisting of lines of text and derive a histogram of word frequencies. Let’s start from the single-threaded Java code that solves the problem for a basic data structure such as an ArrayList:

List<String> lines = someExistingList();
Map<String, Long> counts = new HashMap<>();
for (String line : lines) {
    for (String word : line.toLowerCase().split("\\W+")) {
        if (!word.isEmpty()) {
            counts.merge(word, 1L, (count, one) -> count + one);
        }
    }
}

Basically, this code does everything in one nested loop. This works for the local ArrayList, very efficiently so, but in this form it is not amenable to auto-parallelization. We need a reactive coding style that uses lambda functions to specify what to do with the data once it’s there while the framework takes care of passing it to the lambdas. This leads us to the Jet API expression:

Pipeline p = Pipeline.create();
p.readFrom(textSource())
 .flatMap(e -> traverseArray(e.getValue().toLowerCase().split("\\W+")))
 .filter(word -> !word.isEmpty())
 .groupingKey(wholeItem())
 .aggregate(AggregateOperations.counting())
 .writeTo(someSink());

In this form we can clearly identify individual steps taken by the computation:

  1. Read lines from the text source.

  2. Flat-map lines into words.

  3. Filter out empty words.

  4. Group by word, aggregate by counting.

  5. Write to the sink.

Now Hazelcast can set up several concurrent tasks, each receiving the data from the previous task and emitting the results to the next one, and apply our lambda functions as plug-ins:

// Source task
for (String line : textSource.readLines()) {
    emit(line);
}
// Flat-mapping task
for (String line : receive()) {
    for (String word : flatMapFn.apply(line)) {
        emit(word);
    }
}
// Filtering task
for (String word : receive()) {
    if (filterFn.test(word)) {
        emit(word);
    }
}
// Aggregating task
Map<String, Long> counts = new HashMap<>();
for (String word : receive()) {
    String key = keyFn.apply(word);
    counts.merge(key, 1L, (count, one) -> count + one);
}
// finally, when done receiving:
for (Entry<String, Long> keyAndCount : counts.entrySet()) {
    emit(keyAndCount);
}
// Sink task
for (Entry<String, Long> e: receive()) {
    sink.write(e);
}

The tasks are connected into a cascade, forming the following DAG (flatMap and filter steps get automatically fused into a single task):

Word count DAG

With the computation in this shape, Hazelcast can now parallelize it by starting more than one parallel task for a given step. It can also introduce a network connection between tasks, sending the data from one cluster node to the other. This is the basic principle behind auto-parallelization and distribution.

Core DAG Planner

As you write a pipeline, you form the pipeline DAG and when you submit it for execution, the planner converts it to the core DAG.

  • Pipeline

  • Core DAG

Pipeline p = Pipeline.create();
p.readFrom(textSource())
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .filter(word -> !word.isEmpty())
 .groupingKey(wholeItem())
 .aggregate(AggregateOperations.counting())
 .writeTo(someSink());

From the pipeline DAG to the core DAG

We can see that Hazelcast applied some simple graph transformations:

  • joined consecutive stateless transforms into a single vertex

  • implemented the group-and-aggregate transform as two vertices

In addition to this, the core DAG introduces details on how to implement the routing of the data among vertices:

Edge types in the core DAG

Parallel Processing

Hazelcast creates multiple parallel tasklets for each stage. It transfers the data between the tasklets of consecutive stages using two main routing strategies:

  • round-robin: a load-balancing edge that sends items to tasklets in a round-robin fashion. If a given queue is full, it tries the next one.

  • isolated: isolates the parallel code paths from each other, thereby preserving the order of events in each path. When the two connected vertices have the same parallelism, it establishes one-to-one connections between tasklets.

  • partitioned: computes the partition key of every item, which uniquely determines the destination tasklet. Necessary for stateful keyed transformations like group-and-aggregate.

Round-robin is the default strategy. This means that an event emitted by a tasklet can be routed to any tasklet of the following stage. This strategy results in good balancing of the load of every CPU core, but it introduces event reordering.

You can tell Hazelcast not to use the round-robin routing strategy by enabling the preserveOrder property on the pipeline. In this case Hazelcast uses the isolated strategy. This also restricts the parallelism, which can’t change from one stage to the next. Effectively, the entire pipeline has the same parallelism as the source. For example, if you have a non-partitioned source that Hazelcast accesses with a single processor, the entire pipeline may have a parallelism of 1. Hazelcast is still free to increase the parallelism at the point where you introduce a new groupingKey or explicitly rebalance the data flow.

This planning step that transform the pipeline to the Core DAG happens on the server side after you submit the pipeline for execution to the cluster. You also have the option to build the Core DAG directly, using its API, but it mostly offers you a lot of ways to make mistakes with little opportunity to improve on the automatic process.

When the job is starting inside Hazelcast, it will print the DAG definition in the DOT format, which you can visualize on a site like WebGraphviz. For example, our pipeline comes out in DAG form like this:

digraph DAG {
  "filesSource(/*)" [localParallelism=1];
  "fused(flat-map, filter)" [localParallelism=2];
  "group-and-aggregate-prepare" [localParallelism=2];
  "group-and-aggregate" [localParallelism=2];
  "loggerSink" [localParallelism=1];
  "filesSource(/*)" -> "fused(flat-map, filter)" [queueSize=1024];
  "fused(flat-map, filter)" -> "group-and-aggregate-prepare"
      [label="partitioned", queueSize=1024];
  subgraph cluster_0 {
    "group-and-aggregate-prepare" -> "group-and-aggregate"
      [label="distributed-partitioned", queueSize=1024];
  }
  "group-and-aggregate" -> "loggerSink" [queueSize=1024];
}

Tasks Concurrency is Cooperative

Hazelcast avoids starting a heavyweight system thread for each concurrent task of the DAG. Instead, it uses a cooperative multithreading model. This has high-level implications as well: all the lambdas you write in the Jet API must cooperate by not calling blocking methods that may take unpredictably long to complete. If that happens, all the tasklets scheduled on the same thread will be blocked as well.

Since sometimes you can’t avoid making blocking calls, Hazelcast provides dedicated support for such cases. You should use the mapUsingService transform that allows you to declare your code as "non-cooperative". Hazelcast will adapt to this by running the code in a dedicated thread.

However, whenever you have a choice, you should go for non-blocking, asynchronous calls and use mapUsingServiceAsync.

Data Partitioning

When you split the stream by, for example, user ID and aggregate every user’s events independently, you should send all the events with the same user ID to the same task, the one holding that user’s state. Otherwise, all the tasks will end up with storage for all the IDs and no task will have the full picture. The technique to achieve this separation is data partitioning: Hazelcast uses a function that maps any user ID to an integer from a predefined range and then assigns the integers to tasks:

Data Partitioning

This brings us to the following picture of the DAG instantiated on two cluster members:

Exploded view of the word count DAG

Note that the data can flow mostly within the same machine, except when it reaches the partitioned edge. Hazelcast additionally optimizes for throughput by splitting the aggregate vertex into two, called accumulate and combine:

Two-Stage Aggregation

Here the edge coming into accumulate is also partitioned, but only locally: every cluster member has all the partitions, but the aggregation results are only partial. Once the accumulate step has seen all the items, it sends its partial result to combine which combines the partial results from all cluster members. Since there is much less data after aggregation than before it, this reduces the amount of data exchanged between servers at the cost of using more RAM.

Execution Planner

On the server side Hazelcast makes a detailed plan of execution, instantiating tasklets and connecting them with high-performance concurrent queues:

Tasklet Execution Plan

In this picture the source and sink vertices have a local parallelism of one and the inner vertices have local a parallelism of two. Hazelcast has reasonable defaults for the local parallelism depending on the work a given vertex is doing. For computational vertices it matches the number of available CPU cores and for IO-bound vertices it uses one or two.

Hazelcast replicates this layout on every cluster member, connecting the networking tasklets that implement a distributed edge:

Cluster-Level Execution Plan

Cooperative Execution Engine

This is (ideally) how Hazelcast would run our DAG on a 2-core machine:

Tasklet Execution Plan

There are just two threads, all the tasklets are cooperative and they share the two threads. However, often the source and/or sink is forced to use a blocking API and their tasklets must be marked as non-cooperative. In that case execution would look like this:

Execution with some Non-Cooperative Tasklets

Backpressure

Every DAG vertex has a different processing capacity. There is always a risk that the source vertex produces data at a higher speed than a given vertex can process. If this happens, we need a mechanism that signals back to the source to moderate its operation so that the whole pipeline stays in balance and operates at the speed of the slowest vertex. We call this mechanism backpressure.

Local communication between tasklets inside the same Hazelcast member is easy: we just use bounded queues and force the tasklets to back off as soon as all their output queues are full.

Backpressure is trickier over a network link: instead of a shared memory location you can use for reliable instant signaling, all we have are messages sent over unreliable links that have significant latency. Hazelcast uses a design very similar to the TCP/IP adaptive receive window: the sender must wait for an acknowledgment from the receiver telling it how many more data items it can send. After processing item N, the receiver sends a message that the sender can send up to item N + RWIN.

The receiver sends the acknowledgment message ten times per second, so as long as the receive window is large enough to hold the amount of data processed within 100 milliseconds plus network link latency, the receiver will always have data ready to be processed:

Receive Window

Hazelcast calculates the size of the receive window based on the rate of data flow through a given tasklet. It adaptively shrinks and expands it as the flow changes. In stable state the window size is 300 milliseconds' worth of data.

Hazelcast Replicates the DAG on Each Cluster Member

When you submit a job to the cluster, one of the members takes on the role of a coordinator to carry out the following tasks:

  • Expand the core DAG into the tasklet execution plan.

  • Distribute the execution plan to all the other members.

  • Move the pipeline execution job through its lifecycle (initialize, run, clean up) while the other members follow its commands and report state changes.

An interconnected Hazelcast cluster with one member acting as a coordinator

Because every member instantiates the same DAG, each DAG vertex runs on each cluster member. Also, each vertex expands to several parallel tasks, one for each CPU core by default. This means an edge in the DAG represents many point-to-point connections between the parallel tasks.

Coordinator creates and distributes the execution plan

If a cluster member fails (leaves the cluster), the coordinator suspends all the jobs, rescales them to the new cluster topology, and resumes them.

If the coordinator fails, the other members enter a consensus protocol to elect a new one, which then restores all the running jobs.