A newer version of Hazelcast Platform is available.

View latest

Transforms

Hazelcast offers several transforms which can be used to process data. Explore the available transforms and learn how to use them.

Stateless transforms

Stateless transforms do not have any notion of state, meaning that all items must be processed independently of any previous items.

These methods transform the input into the correct shape that is required by further, more complex ones. The key feature of these transforms is that they do not have side-effects and they treat each item in isolation.

map

Mapping is the simplest kind of stateless transformation. It simply applies a function to the input item, and passes the output to the next stage.

StreamStage<String> names = stage.map(name -> name.toLowerCase());

filter

Similar to map, the filter is a stateless operator that applies a predicate to the input to decide whether to pass it to the output.

BatchStage<String> names = stage.filter(name -> !name.isEmpty());

flatMap

flatMap is equivalent to map, with the difference that instead of one output item you can have arbitrary number of output items per input item. The output type is a Traverser, which is a Hazelcast type similar to an Iterator. For example, the code below will split a sentence into individual items consisting of words:

StreamStage<String> words = stage.flatMap(
  sentence -> Traversers.traverseArray(sentence.split("\\W+"))
);

merge

This transform merges the contents of two streaming sources into one. The item type in the right-hand stage must be the same or a subtype of the one in the left-hand stage. The items from both sides will be interleaved in arbitrary order.

StreamStage<Trade> tradesNewYork = pipeline
  .readFrom(KafkaSources.kafka(.., "xnys"))
  .withoutTimestamps();
StreamStage<Trade> tradesTokyo = pipeline
  .readFrom(KafkaSources.kafka(.., "xjpx"))
  .withoutTimestamps();
StreamStage<Trade> tradesNyAndTokyo = tradesNewYork.merge(tradesTokyo);

mapUsingIMap

This transform looks up each incoming item from the corresponding IMap and the result of the lookup is combined with the input item.

StreamStage<Order> orders = pipeline
  .readFrom(KafkaSources.kafka(.., "orders"))
  .withoutTimestamps();
StreamStage<OrderDetails> details = orders.mapUsingIMap("products",
order -> order.getProductId(),
(order, product) -> new OrderDetails(order, product));

The above code can be thought of as equivalent to below, where the input is of type Order

public void getOrderDetails(Order order) {
  IMap<String, ProductDetails> map = hz.getMap("products");
  ProductDetails product = map.get(order.getProductId());
  return new OrderDetails(order, product);
}

See Joining Static Data to a Stream for a tutorial using this operator.

mapUsingReplicatedMap

This transform is equivalent to mapUsingIMap with the only difference that a ReplicatedMap is used instead of an IMap.

StreamStage<Order> orders = pipeline
  .readFrom(KafkaSources.kafka(.., "orders"))
  .withoutTimestamps();
StreamStage<OrderDetails> details = orders.mapUsingReplicatedMap("products",
  order -> order.getProductId(),
  (order, product) -> new OrderDetails(order, product));
With a ReplicatedMap, as opposed to a standard IMap, every lookup is local. The downside is that the data is replicated to all the members, consuming more memory in the cluster.

mapUsingService

This transform takes an input and performs a mapping using a service object. Examples are an external HTTP-based service or some library which is loaded and initialized during runtime (such as a machine learning model).

The service itself is defined through a ServiceFactory object. The main difference between this operator and a simple map is that the service is initialized once per job. This is what makes it useful for calling out to heavy-weight objects which are expensive to initialize (such as HTTP connections).

Let’s imagine an HTTP service which returns details for a product and that we have wrapped this service in a ProductService class:

interface ProductService {
  ProductDetails getDetails(int productId);
}

We can then create a shared service factory as follows:

StreamStage<Order> orders = pipeline
  .readFrom(KafkaSources.kafka(.., "orders"))
  .withoutTimestamps();
ServiceFactory<?, ProductService> productService = ServiceFactories
  .sharedService(ctx -> new ProductService(url))
  .toNonCooperative();

"Shared" means that the service is thread-safe and can be called from multiple-threads, so Hazelcast will create just one instance on each member and share it among the parallel tasklets.

We also declared the service as "non-cooperative" because it makes blocking HTTP calls. Failing to do this would have severe consequences for the performance of not just your pipeline, but all the jobs running on the Hazelcast cluster.

We can then perform a lookup on this service for each incoming order:

StreamStage<OrderDetails> details = orders.mapUsingService(productService,
  (service, order) -> {
    ProductDetails details = service.getDetails(order.getProductId);
    return new OrderDetails(order, details);
  }
);

mapUsingServiceAsync

This transform is identical to mapUsingService with one important distinction: the service in this case supports asynchronous calls, which are compatible with cooperative concurrency and don’t need extra threads. It also means that we can have multiple requests in flight at the same time to maximize throughput. Instead of the mapped value, this transform expects the user to supply a CompletableFuture<T> as the return value, which will be completed at some later time.

For example, if we extend the previous ProductService as follows:

interface ProductService {
  ProductDetails getDetails(int productId);
  CompletableFuture<ProductDetails> getDetailsAsync(int productId);
}

We still create the shared service factory as before:

StreamStage<Order> orders = pipeline
  .readFrom(KafkaSources.kafka(.., "orders"))
  .withoutTimestamps();
ServiceFactory<?, ProductService> productService = ServiceFactories
  .sharedService(ctx -> new ProductService(url));

The lookup instead becomes async, and note that the transform also expects you to return

StreamStage<OrderDetails> details = orders.mapUsingServiceAsync(productService,
  (service, order) -> {
    CompletableFuture<ProductDetails> f = service.getDetailsAsync(order.getProductId);
    return f.thenApply(details -> new OrderDetails(order, details));
  }
);

The main advantage of using async communication is that we can have many invocations to the service in-flight at the same time which will result in better throughput.

mapUsingServiceAsyncBatched

This variant is very similar to the previous one, but instead of sending one request at a time, we can send in so-called "smart batches" (for a more in-depth look at the internals of the Jet engine, see Cooperative Multithreading). Hazelcast will automatically group items as they come, and allows to send requests in batches. This can be very efficient for example for a remote service, where instead of one roundtrip per request, you can send them in groups to maximize throughput. If we would extend our ProductService as follows:

interface ProductService {
    ProductDetails getDetails(int productId);
    CompletableFuture<ProductDetails> getDetailsAsync(int productId);
    CompletableFuture<List<ProductDetails>> getAllDetailsAsync(List<Integer> productIds);
}

We can then rewrite the transform as:

StreamStage<OrderDetails> details = orders.mapUsingServiceAsyncBatched(productService,
  (service, orderList) -> {
    List<Integer> productIds = orderList
      .stream()
      .map(o -> o.getProductId())
      .collect(Collectors.toList())
  CompletableFuture<List<ProductDetails>> f = service
      .getDetailsAsync(order.getProductId());
  return f.thenApply(productDetailsList -> {
      List<OrderDetails> orderDetailsList = new ArrayList<>();
      for (int i = 0; i < orderList; i++) {
        new OrderDetails(order.get(i), productDetailsList.get(i)))
      }
    };
  });
})

As you can see, there is some more code to write to combine the results back, but this should give better throughput given the service is able to efficient batching.

mapUsingPython

Hazelcast can call Python code to perform a mapping step in the pipeline. The prerequisite is that the Hazelcast servers are Linux or Mac with Python installed and that the hazelcast-jet-python module is deployed on the classpath, through being present in the lib directory. Hazelcast supports Python versions 3.5-3.7.

For a full tutorial, see Apply a Python Function.

You are expected to define a function, conventionally named transform_list(input_list), that takes a list of strings and returns a list of strings whose items match positionally one-to-one with the input list. Hazelcast will call this function with batches of items received by the Python mapping stage. If necessary, you can also use a custom name for the transforming function.

Internally Hazelcast launches Python processes that execute your function. It launches as many of them as requested by the localParallelism setting on the Python pipeline stage. It prepares a local virtual Python environment for the processes to run in and they communicate with it over the loopback network interface, using a bidirectional streaming gRPC call.

If you have some simple Python work that fits into a single file, you can tell Hazelcast just the name of that file, which is assumed to be a Python module file that declares transform_list:

StreamStage<String> sourceStage = sourceStage();
StreamStage<String> pythonMapped = sourceStage.apply(PythonTransforms.mapUsingPython(
  new PythonServiceConfig().setHandlerFile("path/to/handler.py")));

And here’s an example of handler.py:

def transform_list(input_list):
  return ['reply-' + item for item in input_list]

If you have an entire Python project that you want to use from Hazelcast, just name its base directory and Hazelcast will upload all it (recursively) to the cluster as a part of the submitted job. In this case you must also name the Python module that declares transform_list:

StreamStage<String> sourceStage = sourceStage();
StreamStage<String> pythonMapped = sourceStage.apply(PythonTransforms.mapUsingPython(
  new PythonServiceConfig().setBaseDir("path/to/python_project")
    .setHandlerModule("jet_handler"))
);

Normally your Python code will make use of non-standard libraries. Hazelcast recognizes the conventional requirements.txt file in your project’s base directory and will ensure all the listed requirements are satisfied.

Finally, Hazelcast also recognizes bash scripts init.sh and cleanup.sh. It will run those during the initialization and cleanup phases of the job. Regardless of the parallelism of the Python stage, these scripts run just once per job, and they run in the context of an already activated virtual environment.

One issue with making requirements.txt work is that in many production back-end environments the public internet is not available. To work around this you can pre-install all the requirements to the global (or user-local) Python environment on all Hazelcast servers. You can also take full control by writing your own logic in init.sh that installs the dependencies to the local virtual environment. For example, you can make use of pip --find_links.

hashJoin

hashJoin is a type of join where you have two or more inputs where all but one of the inputs must be small enough to fit in memory. You can consider a primary input which is accompanied by one or more side inputs which are small enough to fit in memory. The side inputs are joined to the primary input, which can be either a batch or streaming stage. The side inputs must be batch stages.

StreamStage<Order> orders = pipeline
        .readFrom(orderSource())
        .withoutTimestamps();
BatchStage<ProductDetails> productDetails = pipeline
        .readFrom(productDetailsSource());
StreamStage<OrderDetails> joined = orders.hashJoin(productDetails,
        onKeys(Order::productId, ProductDetails::productId),
        (order, product) -> new OrderDetails(order, product)
);

The last argument to hashJoin is a function that gets the input and the enriching item. Note that by default Hazelcast does an outer join: if the enriching stream lacks a given key, the corresponding function parameter will be null. You can request an inner join as well:

StreamStage<OrderDetails> joined = orders.innerHashJoin(productDetails,
        onKeys(Order::productId, ProductDetails::productId),
        (order, product) -> new OrderDetails(order, product)
);

In this case the product argument is never null and if a given key is missing, the input Order item is filtered out.

Hazelcast also supports hash-joining with more streams at once through hashJoin2 and the hashJoinBuilder. Refer to their documentation for more details.

Stateful transforms

Stateful transforms accumulate data, and the output depends on previously encountered items.

For example, using stateful transforms, you could count how many items have been encountered so far in a stream and emit the current count with every new item. To do so, Hazelcast maintains a current state of the number of total items encountered so far.

When it comes to maintaining state, there is also an important distinction between streaming and batch jobs. Windowing only applies to streaming jobs where an element of time is present, whereas applying a one-time aggregation over the whole data set is only possible in batch pipelines.

aggregate

Data aggregation is the cornerstone of distributed stream processing. It computes an aggregate function (simple examples: sum or average) over the data items.

When used without a defined window, the aggregate() method applies a one-time aggregation over the whole of the input which is only possible in a bounded input (using BatchStage).

For example, a very simple aggregation will look like this:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(0, 1, 2, 3, 4, 5))
 .aggregate(AggregateOperations.counting())
 .writeTo(Sinks.logger());

This will output only one result, which is the count of all the items:

11:49:12.435 [ INFO] [c.h.j.i.c.W.loggerSink#0] 6

The Jet API provides several built in aggregation methods, such as:

Method Description

averagingLong()

averageDouble()

Calculates an average of the given inputs.

counting()

Returns the count of all the items.

summingLong()

summingDouble()

Returns the sum of all the items.

maxBy()

minBy()

Finds the minimum or maximum sorted according to some criteria.

toList()

Simply groups the items in a list and returns it.

bottomN()

topN()

Calculates the bottom or top N items sorted according to some criteria.

linearTrend()

Computes a trend line over the given items, for example the velocity given GPS coordinates.

allOf()

Combine multiple aggregations into one aggregation (for example, if you want both sum and average).

For a complete list, please refer to the AggregateOperations class. You can also implement your own aggregate operations using the builder in AggregateOperation .

groupingKey

Typically you don’t want to aggregate all the items together, but group them by some key and then aggregate over each group separately. This is achieved by using the groupingKey transform and then applying an aggregation on it afterwards.

We can extend the previous example to group odd and even values separately:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(0, 1, 2, 3, 4, 5))
 .groupingKey(i -> i % 2 == 0 ? "odds" : "evens")
 .aggregate(AggregateOperations.counting())
 .writeTo(Sinks.logger());
11:51:46.723 [ INFO] [c.h.j.i.c.W.loggerSink#0] odds=3
11:51:46.723 [ INFO] [c.h.j.i.c.W.loggerSink#0] evens=3

Grouping is critical for aggregating massive data sets in distributed computing - otherwise you would not be able to make use of parallelization as effectively.

rollingAggregate

Rolling aggregation is similar to aggregate but instead of waiting to output until all items are received, it produces an output item for each input item. Because of this, it’s possible to use it in a streaming pipeline as well, as the aggregation is applied in a continuous way. The same pipeline from aggregate, can be rewritten to use a rollingAggregate transform instead:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(0, 1, 2, 3, 4, 5))
 .groupingKey(i -> i % 2 == 0 ? "odds" : "evens")
 .rollingAggregate(AggregateOperations.counting())
 .writeTo(Sinks.logger());

Instead of a single line of output, we would get the following output instead:

12:06:29.405 [ INFO] [c.h.j.i.c.W.loggerSink#0] odds=1
12:06:29.405 [ INFO] [c.h.j.i.c.W.loggerSink#0] odds=2
12:06:29.405 [ INFO] [c.h.j.i.c.W.loggerSink#0] odds=3
12:06:29.406 [ INFO] [c.h.j.i.c.W.loggerSink#0] evens=1
12:06:29.406 [ INFO] [c.h.j.i.c.W.loggerSink#0] evens=2
12:06:29.406 [ INFO] [c.h.j.i.c.W.loggerSink#0] evens=3

window

The process of data aggregation takes a finite batch of data and produces a result. We can make it work with an infinite stream if we break up the stream into finite chunks. This is called windowing and it’s almost always defined in terms of a range of event timestamps (a time window).

Window transforms requires a stream which is annotated with timestamps, that is each input item has a timestamp associated with it. Timestamps are given in milliseconds and are generally represented in epoch format, as a simple long.

For a more in-depth look at the event time model, please refer to Event Time and Processing Time.

The general way to assign windows to a stream works as follows:

tumblingWindow

Tumbling windows are the most basic window type - a window of constant size that "tumbles" along the time axis. If you use a window size of 1 second, Hazelcast will group together all events that occur within the same second and you’ll get window results for intervals [0-1) seconds, then [1-2) seconds, and so on.

A simple example is given below:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100)) // will emit 100 items per second
 .withIngestionTimestamps()
 .window(WindowDefinition.tumbling(TimeUnit.SECONDS.toMillis(1)))
 .aggregate(AggregateOperations.counting())
 .writeTo(Sinks.logger());

When you run this pipeline, you will see output like this, where each output window is marked with start and end timestamps:

14:26:28.007 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=14:26:27.000, end=14:26:28.000, value='100', isEarly=false}
14:26:29.009 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=14:26:28.000, end=14:26:29.000, value='100', isEarly=false}
14:26:30.004 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=14:26:29.000, end=14:26:30.000, value='100', isEarly=false}
14:26:31.008 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=14:26:30.000, end=14:26:31.000, value='100', isEarly=false}

As with a normal aggregation, it’s also possible to apply a grouping to a windowed operation:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100)) // will emit 100 items per second
 .withIngestionTimestamps()
 .groupingKey(i -> i.sequence() % 2 == 0 ? "even" : "odd")
 .window(WindowDefinition.tumbling(TimeUnit.SECONDS.toMillis(1)))
 .aggregate(AggregateOperations.counting())
 .writeTo(Sinks.logger());

In this mode, the output would be keyed:

15:09:24.017 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:23.000, end=15:09:24.000, key='odd', value='50', isEarly=false}
15:09:24.018 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:23.000, end=15:09:24.000, key='even', value='50', isEarly=false}
15:09:25.014 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:24.000, end=15:09:25.000, key='odd', value='50', isEarly=false}
15:09:25.015 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:24.000, end=15:09:25.000, key='even', value='50', isEarly=false}
15:09:26.009 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:25.000, end=15:09:26.000, key='odd', value='50', isEarly=false}
15:09:26.009 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:25.000, end=15:09:26.000, key='even', value='50', isEarly=false}
15:09:27.013 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:26.000, end=15:09:27.000, key='odd', value='50', isEarly=false}

slidingWindow

Sliding window is like a tumbling window that instead of hopping from one time range to another, slides along instead. It slides in discrete steps that are a fraction of the window’s length. If you use a window of size 1 second sliding by 100 milliseconds, Hazelcast will output window results for intervals [0.00-1.00) seconds, then [0.10-1.1) seconds, and so on.

We can modify the tumbling window example as below:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100)) // will emit 100 items per second
 .withIngestionTimestamps()
 .window(WindowDefinition.sliding(TimeUnit.SECONDS.toMillis(1), 100))
 .aggregate(AggregateOperations.counting())
 .writeTo(Sinks.logger());

When you run this pipeline, you will see output like the following, where you can see that the start and end timestamps of the windows are overlapping.

15:07:38.108 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.100, end=15:07:38.100, value='100', isEarly=false}
15:07:38.209 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.200, end=15:07:38.200, value='100', isEarly=false}
15:07:38.313 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.300, end=15:07:38.300, value='100', isEarly=false}
15:07:38.408 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.400, end=15:07:38.400, value='100', isEarly=false}
15:07:38.505 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.500, end=15:07:38.500, value='100', isEarly=false}

sessionWindow

Session window captures periods of activity followed by periods of inactivity. You define the "session timeout", i.e., the length of the inactive period that causes the window to close. The typical example of a session window is a user’s activity on a website (hence the name). There are bursts of activity (while the user is browsing website ) followed by rather long periods of inactivity.

As with other aggregate transforms, if you define a grouping key, there will be a separate, independent session window for each key.

In the example below, we want to find out how many different events each user had during a web session. The data source is a stream of events read from Kafka and we assume that the user session is closed after 15 minutes of inactivity:

p.readFrom(KafkaSources.kafka(.., "website-events"))
 .withIngestionTimestamps()
 .groupingKey(event -> event.getUserId())
 .window(WindowDefinition.session(TimeUnit.MINUTES.toMillis(15)))
 .aggregate(AggregateOperations.counting())
 .writeTo(Sinks.logger());

Early Results

If you had to allow a lot of event lateness, or if you just use large time windows, you may want to track the progress of a window while it is still accumulating events. You can order Hazelcast to give you, at regular intervals, the current status on all the windows it has some data for, but aren’t yet complete. For example, on the session window example we may want to get an update of all running session every second without waiting for the 15 minute timeout to get the full results:

p.readFrom(KafkaSources.kafka(.., "website-events"))
 .withIngestionTimestamps()
 .groupingKey(event -> event.getUserId())
 .window(WindowDefinition.session(TimeUnit.MINUTES.toMillis(15))
   .setEarlyResultsPeriod(SECONDS.toMillis(1)))
 .aggregate(AggregateOperations.counting())
 .writeTo(Sinks.logger());

The output of the windowing stage is in the form of KeyedWindowResult<String, Long>, where String is the word and Long is the frequency of the events in the given window. KeyedWindowResult also has an isEarly property that says whether the result is early or final.

The early results period works for all windows types. For example in a tumbling window, if you are working with a window size of one minute and there’s an additional 15-second allowed lateness for the late-coming events, this amounts to waiting up to 75 seconds from receiving a given event to getting the result it contributed to. Therefore, it may be desirable to ask Hazelcast to give us updates on the current progress every second.

Generally, Hazelcast doesn’t guarantee that a stage will receive the items in the same order its upstream stage emitted them. For example, it executes a map transform with many parallel tasks. One task may get the early result and another one the final result. They may emit the transformed result to the sink in any order. This can lead to a situation where your sink receives an early result after it has already received the final result.

distinct

Suppresses duplicate items from a stream. This operation applies primarily to batch streams, but also works on a windowed unbounded stream.

This example takes some input of integers and outputs only the distinct values:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(0, 1, 1, 2, 3, 4, 5, 6))
 .distinct()
 .writeTo(Sinks.logger());

We can also use distinct with grouping, but then two items mapping to the same key will be duplicates. For example the following will print only strings that have different first letters:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("joe", "john", "jenny", "maria"))
 .groupingKey(s -> s.substring(0, 1))
 .distinct()
 .writeTo(Sinks.logger());

This is a possible output (Hazelcast can choose any of the names starting in "j"):

14:05:29.382 [ INFO] [c.h.j.i.c.W.loggerSink#0] joe
14:05:29.383 [ INFO] [c.h.j.i.c.W.loggerSink#0] maria

sort

Sorting only works on a batch stage. It sorts the input according to its natural ordering (the type must be Comparable) or according to the Comparator you provide. Example:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("bob", "alice", "dan", "chuck"))
 .sort()
 .writeTo(Sinks.logger());

This is how the output should look:

10:43:54.523 [ INFO] [c.h.j.i.c.W.loggerSink#0] alice
10:43:54.524 [ INFO] [c.h.j.i.c.W.loggerSink#0] bob
10:43:54.524 [ INFO] [c.h.j.i.c.W.loggerSink#0] chuck
10:43:54.524 [ INFO] [c.h.j.i.c.W.loggerSink#0] dan

Hazelcast sorts the data in two steps: first it sorts the data that is local on each member, and then it merges the sorted streams into the final output. The first step requires O(n) heap memory, but the second step just receives the data in the correct order and has O(1) memory needs.

mapStateful

mapStateful is an extension of the simple map transform. It adds the capability to optionally retain mutable state.

The major use case of stateful mapping is recognizing a pattern in the event stream, such as matching start-transaction with end-transaction events based on an event correlation ID. More generally, you can implement any kind of state machine and detect patterns in an input of any complexity.

As with other stateful operations, you can also use a groupingKey to have a unique state per key.

For example, consider a pipeline that matches incoming TRANSACTION_START events to TRANSACTION_END events which can arrive unordered and when both are received outputs how long the transaction took.

This would be difficult to express in terms of a slidingWindow, because we can’t know how long a transaction would take in advance, and if it would span multiple windows. It can’t be expressed using sessionWindow either, because we don’t want to wait until the window times out before emitting the results.

Let’s say we have the following class:

public class TransactionEvent {
    long timestamp();
    String transactionId();
    EventType type();
}

public enum EventType {
    TRANSACTION_START,
    TRANSACTION_END
}

We can then use the following mapStateful transform to match start and end events:

p.readFrom(KafkaSources.kafka(.., "transaction-events"))
 .withNativeTimestamps(0)
 .groupingKey(event -> event.getTransactionId())
 .mapStateful(MINUTES.toMillis(10),
   () -> new TransactionEvent[2],
   (state, id, event) -> {
        if (event.type() == TRANSACTION_START) {
            state[0] = event;
        } else if (event.type() == TRANSACTION_END) {
            state[1] = event;
        }
        if (state[0] != null && state[1] != null) {
            // we have both start and end events
            long duration = state[1].timestamp() - state[0].timestamp();
            return MapUtil.entry(event.transactionId(), duration);
        }
        // we don't have both events, do nothing for now.
        return null;
    },
    (state, id, currentWatermark) ->
        // if we have not received both events after 10 minutes,
        // we will emit a timeout entry
        (state[0] == null || state[1] == null)
            ? MapUtil.entry(id, TIMED_OUT)
            : null
 ).writeTo(Sinks.logger());

You will note that we also had to set an expiry time on the states (first parameter of the mapStateful method), otherwise would eventually run out of memory as we accumulate more and more transactions.

co-group / join

Co-grouping allows to join any number of inputs on a common key, which can be anything you can calculate from the input item. This makes it possible to correlate data from two or more different sources. In the same transform you are able to apply an aggregate function to all the grouped items.

As an example, we can use a sequence of events that would be typical on an e-commerce website: PageVisit and AddToCart. We want to find how many visits were required before an item was added to the cart. For simplicity, let’s say we’re working with historical data and we are processing this data from a set of logs.

Pipeline p = Pipeline.create();
BatchStageWithKey<PageVisit, Integer> pageVisit =
    p.readFrom(Sources.files("visit-events.log"))
     .groupingKey(event -> event.userId());
BatchStageWithKey<AddToCart, Integer> addToCart =
    p.readFrom(Sources.files("cart-events.log"))
     .groupingKey(event -> event.userId());

After getting the two keyed streams, now we can join them:

BatchStage<Entry<Integer, Tuple2<Long, Long>>> coGrouped = pageVisit
        .aggregate2(counting(), addToCart, counting());

This gives an item which contains the counts for both events for the same user id. From this, it’s easy to calculate the ratio of visits vs add to cart events.

Co-grouping can also be applied to windowed streams, and works exactly the same way as aggregate(). An important consideration is that the timestamps from both streams would be considered, so it’s important that the two streams don’t have widely different timestamps.

rebalance

Hazelcast prefers not to send the data around the computing cluster. If your data source retrieves some part of the data stream on member A and you apply stateless mapping on it, this processing will happen on member A. Hazelcast will send the data only when needed to achieve correctness, for example in the case of non-parallelizable operations like mapStateful. Such transforms must be performed on a single member, using a single Jet engine and all the data received on any other member must be sent to the processing one.

The above policy results in the best throughput in most cases. However, in some cases there is an inherent imbalance among cluster members in terms of how much data they get from a data source. The most important example are non-parallelized sources, where a single processor on a single Hazelcast member receives all the data. In such a case you can apply the rebalance operator, which orders Hazelcast to send the data out to other members where normally it wouldn’t choose to.

Rebalancing is best explained on the DAG level. Each pipeline stage corresponds to a vertex in the DAG, and the logic attached to the edge between them decides for each data item which processor to send it to. Some processors are on the same machine and others are on remote machines. By default, Hazelcast considers only the processors on the local machine as candidates, using a round-robin scheme to decide on the target. When you apply rebalancing, Hazelcast simply extends the candidate set to all the processors, including those on the other machines, but keeps using the same round-robin scheme. The order of the round-robin is such that the target cluster member changes every time, maximizing the fairness of the distribution across members.

Round-robin routing takes into account backpressure: if a given processor is overloaded and its input queue is full, Hazelcast tries the next one. If during rebalancing the network becomes a bottleneck, backpressure will automatically divert more traffic to the local processors.

You can also apply a less flexible kind of rebalancing, which will enforce sending to other members even when the local ones are more available: rebalance(keyFn). It uses the keyFn you supply as a partitioning function. In this case every item is tied to one definite choice of the destination processor and backpressure cannot override it. If some processor must apply backpressure, Hazelcast doesn’t try to send the data item to another available processor and instead propagates the backpressure to the upstream vertex. This kind of rebalancing may result in a better balanced CPU load across the cluster, but has the potential for less overall throughput.

peek

stage.peek() is an identity transform that adds diagnostic side-effects: it logs every item it receives. Since the logging happens on the machine that is processing the item, this transform is primarily intended to be used during development.

customTransform

All the data processing in a pipeline happens inside the implementations of the Processor interface, a central part of the Jet API. With stage.customTransform you can provide your own processor implementation that presumably does something that no out-of-the-box processor can. If you get involved with this transform, make sure you are familiar with the internals of Hazelcast, as exposed through the Core DAG API.

JSON

JSON is very frequent data exchange format. To transform the data from/to JSON format you can use JsonUtil utility class without adding an extra dependency to the classpath. The utility class uses the lightweight jackson-jr JSON library under the hood.

For example, you can convert JSON formatted string to a bean. You need to have your bean fields as public or have public getters/setters and a no-argument(default) constructor.

{
  "name": "Jet",
  "age": 4
}
public class Person {
  public String name;
  public int age;

  public Person() {
  }
}
BatchStage<Person> persons = stage.map(jsonString -> JsonUtil.beanFrom(jsonString, Person.class));

If you don’t have a bean class, you can use mapFrom to convert the JSON formatted string to a Map.

BatchStage<Map<String, Object>> personsAsMap = stage.map(jsonString -> JsonUtil.mapFrom(jsonString));

You can also use supported annotations from Jackson Annotations in your transforms by adding it to the classpath.

  • Gradle

  • Maven

compile 'com.fasterxml.jackson.core:jackson-annotations:2.11.0'
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-annotations</artifactId>
  <version>2.11.0</version>
</dependency>

For example if your bean field names differ from the JSON string field names you can use JsonProperty annotation for mapping.

public class Person {

  @JsonProperty("name")
  public String _name;

  @JsonProperty("age")
  public int _age;

  public Person() {
  }
}

See a list of supported annotations.