Transforms
Hazelcast offers several transforms which can be used to process data. Explore the available transforms and learn how to use them.
Stateless transforms
Stateless transforms do not have any notion of state, meaning that all items must be processed independently of any previous items.
These methods transform the input into the correct shape that is required by further, more complex ones. The key feature of these transforms is that they do not have side-effects and they treat each item in isolation.
map
Mapping is the simplest kind of stateless transformation. It simply applies a function to the input item, and passes the output to the next stage.
StreamStage<String> names = stage.map(name -> name.toLowerCase());
filter
Similar to map
, the filter
is a stateless operator that applies a
predicate to the input to decide whether to pass it to the output.
BatchStage<String> names = stage.filter(name -> !name.isEmpty());
flatMap
flatMap
is equivalent to map
, with the difference that instead of
one output item you can have arbitrary number of output items per input
item. The output type is a Traverser
, which is a Hazelcast type similar to
an Iterator
. For example, the code below will split a sentence into
individual items consisting of words:
StreamStage<String> words = stage.flatMap(
sentence -> Traversers.traverseArray(sentence.split("\\W+"))
);
merge
This transform merges the contents of two streaming sources into one. The item type in the right-hand stage must be the same or a subtype of the one in the left-hand stage. The items from both sides will be interleaved in arbitrary order.
StreamStage<Trade> tradesNewYork = pipeline
.readFrom(KafkaSources.kafka(.., "nyc"))
.withoutTimestamps();
StreamStage<Trade> tradesTokyo = pipeline
.readFrom(KafkaSources.kafka(.., "nyc"))
.withoutTimestamps();
StreamStage<Trade> tradesNyAndTokyo = tradesNewYork.merge(tradesTokyo);
mapUsingIMap
This transform looks up each incoming item from the corresponding IMap and the result of the lookup is combined with the input item.
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
StreamStage<OrderDetails> details = orders.mapUsingIMap("products",
order -> order.getProductId(),
(order, product) -> new OrderDetails(order, product));
The above code can be thought of as equivalent to below, where the input
is of type Order
public void getOrderDetails(Order order) {
IMap<String, ProductDetails> map = hz.getMap("products");
ProductDetails product = map.get(order.getProductId());
return new OrderDetails(order, product);
}
See Joining Static Data to a Stream for a tutorial using this operator.
mapUsingReplicatedMap
This transform is equivalent to mapUsingIMap with the
only difference that a ReplicatedMap is used instead
of an IMap
.
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
StreamStage<OrderDetails> details = orders.mapUsingReplicatedMap("products",
order -> order.getProductId(),
(order, product) -> new OrderDetails(order, product));
With a ReplicatedMap , as opposed to a standard IMap , every lookup
is local. The downside is that the data is replicated to all the members,
consuming more memory in the cluster.
|
mapUsingService
This transform takes an input and performs a mapping using a service object. Examples are an external HTTP-based service or some library which is loaded and initialized during runtime (such as a machine learning model).
The service itself is defined through a ServiceFactory
object. The
main difference between this operator and a simple map
is that the
service is initialized once per job. This is what makes it useful for
calling out to heavy-weight objects which are expensive to initialize
(such as HTTP connections).
Let’s imagine an HTTP service which returns details for a product and
that we have wrapped this service in a ProductService
class:
interface ProductService {
ProductDetails getDetails(int productId);
}
We can then create a shared service factory as follows:
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
ServiceFactory<?, ProductService> productService = ServiceFactories
.sharedService(ctx -> new ProductService(url))
.toNonCooperative();
"Shared" means that the service is thread-safe and can be called from multiple-threads, so Hazelcast will create just one instance on each member and share it among the parallel tasklets.
We also declared the service as "non-cooperative" because it makes blocking HTTP calls. Failing to do this would have severe consequences for the performance of not just your pipeline, but all the jobs running on the Hazelcast cluster.
We can then perform a lookup on this service for each incoming order:
StreamStage<OrderDetails> details = orders.mapUsingService(productService,
(service, order) -> {
ProductDetails details = service.getDetails(order.getProductId);
return new OrderDetails(order, details);
}
);
mapUsingServiceAsync
This transform is identical to mapUsingService with
one important distinction: the service in this case supports
asynchronous calls, which are compatible with cooperative concurrency
and don’t need extra threads. It also means that we can have multiple
requests in flight at the same time to maximize throughput. Instead of
the mapped value, this transform expects the user to supply a
CompletableFuture<T>
as the return value, which will be completed at
some later time.
For example, if we extend the previous ProductService
as follows:
interface ProductService {
ProductDetails getDetails(int productId);
CompletableFuture<ProductDetails> getDetailsAsync(int productId);
}
We still create the shared service factory as before:
StreamStage<Order> orders = pipeline
.readFrom(KafkaSources.kafka(.., "orders"))
.withoutTimestamps();
ServiceFactory<?, ProductService> productService = ServiceFactories
.sharedService(ctx -> new ProductService(url));
The lookup instead becomes async, and note that the transform also expects you to return
StreamStage<OrderDetails> details = orders.mapUsingServiceAsync(productService,
(service, order) -> {
CompletableFuture<ProductDetails> f = service.getDetailsAsync(order.getProductId);
return f.thenApply(details -> new OrderDetails(order, details));
}
);
The main advantage of using async communication is that we can have many invocations to the service in-flight at the same time which will result in better throughput.
mapUsingServiceAsyncBatched
This variant is very similar to the previous one, but instead of sending
one request at a time, we can send in so-called "smart batches" (for a
more in-depth look at the internals of the Jet engine, see Cooperative Multithreading). Hazelcast will
automatically group items as they come, and allows to send requests in
batches. This can be very efficient for example for a remote service,
where instead of one roundtrip per request, you can send them in groups
to maximize throughput. If we would extend our ProductService
as
follows:
interface ProductService {
ProductDetails getDetails(int productId);
CompletableFuture<ProductDetails> getDetailsAsync(int productId);
CompletableFuture<List<ProductDetails>> getAllDetailsAsync(List<Integer> productIds);
}
We can then rewrite the transform as:
StreamStage<OrderDetails> details = orders.mapUsingServiceAsyncBatched(productService,
(service, orderList) -> {
List<Integer> productIds = orderList
.stream()
.map(o -> o.getProductId())
.collect(Collectors.toList())
CompletableFuture<List<ProductDetails>> f = service
.getDetailsAsync(order.getProductId());
return f.thenApply(productDetailsList -> {
List<OrderDetails> orderDetailsList = new ArrayList<>();
for (int i = 0; i < orderList; i++) {
new OrderDetails(order.get(i), productDetailsList.get(i)))
}
};
});
})
As you can see, there is some more code to write to combine the results back, but this should give better throughput given the service is able to efficient batching.
mapUsingPython
Hazelcast can call Python code to perform a mapping step in the
pipeline. The prerequisite is that the Hazelcast servers are Linux or Mac with Python installed and that the hazelcast-jet-python
module is deployed
on the classpath, through being present in the lib
directory. Hazelcast
supports Python versions 3.5-3.7.
For a full tutorial, see Apply a Python Function.
You are expected to define a function, conventionally named
transform_list(input_list)
, that takes a list of strings and returns a
list of strings whose items match positionally one-to-one with the input
list. Hazelcast will call this function with batches of items received by the
Python mapping stage. If necessary, you can also use a custom name for
the transforming function.
Internally Hazelcast launches Python processes that execute your function. It
launches as many of them as requested by the localParallelism
setting
on the Python pipeline stage. It prepares a local virtual Python
environment for the processes to run in and they communicate with it
over the loopback network interface, using a bidirectional streaming
gRPC call.
If you have some simple Python work that fits into a single file, you
can tell Hazelcast just the name of that file, which is assumed to be a Python
module file that declares transform_list
:
StreamStage<String> sourceStage = sourceStage();
StreamStage<String> pythonMapped = sourceStage.apply(PythonTransforms.mapUsingPython(
new PythonServiceConfig().setHandlerFile("path/to/handler.py")));
And here’s an example of handler.py
:
def transform_list(input_list):
return ['reply-' + item for item in input_list]
If you have an entire Python project that you want to use from Hazelcast, just
name its base directory and Hazelcast will upload all it (recursively) to
the cluster as a part of the submitted job. In this case you must also
name the Python module that declares transform_list
:
StreamStage<String> sourceStage = sourceStage();
StreamStage<String> pythonMapped = sourceStage.apply(PythonTransforms.mapUsingPython(
new PythonServiceConfig().setBaseDir("path/to/python_project")
.setHandlerModule("jet_handler"))
);
Normally your Python code will make use of non-standard libraries. Hazelcast
recognizes the conventional requirements.txt
file in your project’s
base directory and will ensure all the listed requirements are
satisfied.
Finally, Hazelcast also recognizes bash scripts init.sh
and cleanup.sh
. It
will run those during the initialization and cleanup phases of the job.
Regardless of the parallelism of the Python stage, these scripts run
just once per job, and they run in the context of an already activated
virtual environment.
One issue with making requirements.txt
work is that in many production
back-end environments the public internet is not available. To work
around this you can pre-install all the requirements to the global (or
user-local) Python environment on all Hazelcast servers. You can also take
full control by writing your own logic in init.sh
that installs the
dependencies to the local virtual environment. For example, you can make
use of pip --find_links
.
hashJoin
hashJoin
is a type of join where you have two or more inputs where all
but one of the inputs must be small enough to fit in memory. You can
consider a primary input which is accompanied by one or more
side inputs which are small enough to fit in memory. The side inputs
are joined to the primary input, which can be either a batch or
streaming stage. The side inputs must be batch stages.
StreamStage<Order> orders = pipeline
.readFrom(orderSource())
.withoutTimestamps();
BatchStage<ProductDetails> productDetails = pipeline
.readFrom(productDetailsSource());
StreamStage<OrderDetails> joined = orders.hashJoin(productDetails,
onKeys(Order::productId, ProductDetails::productId),
(order, product) -> new OrderDetails(order, product)
);
The last argument to hashJoin
is a function that gets the input and
the enriching item. Note that by default Hazelcast does an outer join: if the
enriching stream lacks a given key, the corresponding function parameter
will be null
. You can request an inner join as well:
StreamStage<OrderDetails> joined = orders.innerHashJoin(productDetails,
onKeys(Order::productId, ProductDetails::productId),
(order, product) -> new OrderDetails(order, product)
);
In this case the product
argument is never null
and if a given key
is missing, the input Order
item is filtered out.
Hazelcast also supports hash-joining with more streams at once through
hashJoin2
and the hashJoinBuilder
. Refer to their documentation for
more details.
Stateful transforms
Stateful transforms accumulate data, and the output depends on previously encountered items.
For example, using stateful transforms, you could count how many items have been encountered so far in a stream and emit the current count with every new item. To do so, Hazelcast maintains a current state of the number of total items encountered so far.
When it comes to maintaining state, there is also an important distinction between streaming and batch jobs. Windowing only applies to streaming jobs where an element of time is present, whereas applying a one-time aggregation over the whole data set is only possible in batch pipelines.
aggregate
Data aggregation is the cornerstone of distributed stream processing. It computes an aggregate function (simple examples: sum or average) over the data items.
When used without a defined window, the aggregate()
method applies a
one-time aggregation over the whole of the input which is only possible
in a bounded input (using BatchStage
).
For example, a very simple aggregation will look like this:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(0, 1, 2, 3, 4, 5))
.aggregate(AggregateOperations.counting())
.writeTo(Sinks.logger());
This will output only one result, which is the count of all the items:
11:49:12.435 [ INFO] [c.h.j.i.c.W.loggerSink#0] 6
The Jet API provides several built in aggregation methods, such as:
Method | Description |
---|---|
|
Calculates an average of the given inputs. |
|
Returns the count of all the items. |
|
Returns the sum of all the items. |
|
Finds the minimum or maximum sorted according to some criteria. |
|
Simply groups the items in a list and returns it. |
|
Calculates the bottom or top N items sorted according to some criteria. |
|
Computes a trend line over the given items, for example the velocity given GPS coordinates. |
|
Combine multiple aggregations into one aggregation (for example, if you want both sum and average). |
For a complete list, please refer to the AggregateOperations class. You can also implement your own aggregate operations using the builder in AggregateOperation .
groupingKey
Typically you don’t want to aggregate all the items together, but
group them by some key and then aggregate over each group separately.
This is achieved by using the groupingKey
transform and then applying
an aggregation on it afterwards.
We can extend the previous example to group odd and even values separately:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(0, 1, 2, 3, 4, 5))
.groupingKey(i -> i % 2 == 0 ? "odds" : "evens")
.aggregate(AggregateOperations.counting())
.writeTo(Sinks.logger());
11:51:46.723 [ INFO] [c.h.j.i.c.W.loggerSink#0] odds=3
11:51:46.723 [ INFO] [c.h.j.i.c.W.loggerSink#0] evens=3
Grouping is critical for aggregating massive data sets in distributed computing - otherwise you would not be able to make use of parallelization as effectively.
rollingAggregate
Rolling aggregation is similar to aggregate but instead of
waiting to output until all items are received, it produces an output
item for each input item. Because of this, it’s possible to use it in a
streaming pipeline as well, as the aggregation is applied in a
continuous way. The same pipeline from aggregate, can be
rewritten to use a rollingAggregate
transform instead:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(0, 1, 2, 3, 4, 5))
.groupingKey(i -> i % 2 == 0 ? "odds" : "evens")
.rollingAggregate(AggregateOperations.counting())
.writeTo(Sinks.logger());
Instead of a single line of output, we would get the following output instead:
12:06:29.405 [ INFO] [c.h.j.i.c.W.loggerSink#0] odds=1
12:06:29.405 [ INFO] [c.h.j.i.c.W.loggerSink#0] odds=2
12:06:29.405 [ INFO] [c.h.j.i.c.W.loggerSink#0] odds=3
12:06:29.406 [ INFO] [c.h.j.i.c.W.loggerSink#0] evens=1
12:06:29.406 [ INFO] [c.h.j.i.c.W.loggerSink#0] evens=2
12:06:29.406 [ INFO] [c.h.j.i.c.W.loggerSink#0] evens=3
window
The process of data aggregation takes a finite batch of data and produces a result. We can make it work with an infinite stream if we break up the stream into finite chunks. This is called windowing and it’s almost always defined in terms of a range of event timestamps (a time window).
Window transforms requires a stream which is annotated with
timestamps, that is each input item has a timestamp associated with
it. Timestamps are given in milliseconds and are generally represented
in epoch format, as a simple long
.
For a more in-depth look at the event time model, please refer to Event Time and Processing Time.
The general way to assign windows to a stream works as follows:
tumblingWindow
Tumbling windows are the most basic window type - a window of constant size that "tumbles" along the time axis. If you use a window size of 1 second, Hazelcast will group together all events that occur within the same second and you’ll get window results for intervals [0-1) seconds, then [1-2) seconds, and so on.
A simple example is given below:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100)) // will emit 100 items per second
.withIngestionTimestamps()
.window(WindowDefinition.tumbling(TimeUnit.SECONDS.toMillis(1)))
.aggregate(AggregateOperations.counting())
.writeTo(Sinks.logger());
When you run this pipeline, you will see output like this, where each output window is marked with start and end timestamps:
14:26:28.007 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=14:26:27.000, end=14:26:28.000, value='100', isEarly=false}
14:26:29.009 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=14:26:28.000, end=14:26:29.000, value='100', isEarly=false}
14:26:30.004 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=14:26:29.000, end=14:26:30.000, value='100', isEarly=false}
14:26:31.008 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=14:26:30.000, end=14:26:31.000, value='100', isEarly=false}
As with a normal aggregation, it’s also possible to apply a grouping to a windowed operation:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100)) // will emit 100 items per second
.withIngestionTimestamps()
.groupingKey(i -> i.sequence() % 2 == 0 ? "even" : "odd")
.window(WindowDefinition.tumbling(TimeUnit.SECONDS.toMillis(1)))
.aggregate(AggregateOperations.counting())
.writeTo(Sinks.logger());
In this mode, the output would be keyed:
15:09:24.017 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:23.000, end=15:09:24.000, key='odd', value='50', isEarly=false}
15:09:24.018 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:23.000, end=15:09:24.000, key='even', value='50', isEarly=false}
15:09:25.014 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:24.000, end=15:09:25.000, key='odd', value='50', isEarly=false}
15:09:25.015 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:24.000, end=15:09:25.000, key='even', value='50', isEarly=false}
15:09:26.009 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:25.000, end=15:09:26.000, key='odd', value='50', isEarly=false}
15:09:26.009 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:25.000, end=15:09:26.000, key='even', value='50', isEarly=false}
15:09:27.013 [ INFO] [c.h.j.i.c.W.loggerSink#0] KeyedWindowResult{start=15:09:26.000, end=15:09:27.000, key='odd', value='50', isEarly=false}
slidingWindow
Sliding window is like a tumbling window that instead of hopping from one time range to another, slides along instead. It slides in discrete steps that are a fraction of the window’s length. If you use a window of size 1 second sliding by 100 milliseconds, Hazelcast will output window results for intervals [0.00-1.00) seconds, then [0.10-1.1) seconds, and so on.
We can modify the tumbling window example as below:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(100)) // will emit 100 items per second
.withIngestionTimestamps()
.window(WindowDefinition.sliding(TimeUnit.SECONDS.toMillis(1), 100))
.aggregate(AggregateOperations.counting())
.writeTo(Sinks.logger());
When you run this pipeline, you will see output like the following, where you can see that the start and end timestamps of the windows are overlapping.
15:07:38.108 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.100, end=15:07:38.100, value='100', isEarly=false}
15:07:38.209 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.200, end=15:07:38.200, value='100', isEarly=false}
15:07:38.313 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.300, end=15:07:38.300, value='100', isEarly=false}
15:07:38.408 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.400, end=15:07:38.400, value='100', isEarly=false}
15:07:38.505 [ INFO] [c.h.j.i.c.W.loggerSink#0] WindowResult{start=15:07:37.500, end=15:07:38.500, value='100', isEarly=false}
sessionWindow
Session window captures periods of activity followed by periods of inactivity. You define the "session timeout", i.e., the length of the inactive period that causes the window to close. The typical example of a session window is a user’s activity on a website (hence the name). There are bursts of activity (while the user is browsing website ) followed by rather long periods of inactivity.
As with other aggregate transforms, if you define a grouping key, there will be a separate, independent session window for each key.
In the example below, we want to find out how many different events each user had during a web session. The data source is a stream of events read from Kafka and we assume that the user session is closed after 15 minutes of inactivity:
p.readFrom(KafkaSources.kafka(.., "website-events"))
.withIngestionTimestamps()
.groupingKey(event -> event.getUserId())
.window(WindowDefinition.session(TimeUnit.MINUTES.toMillis(15)))
.aggregate(AggregateOperations.counting())
.writeTo(Sinks.logger());
Early Results
If you had to allow a lot of event lateness, or if you just use large time windows, you may want to track the progress of a window while it is still accumulating events. You can order Hazelcast to give you, at regular intervals, the current status on all the windows it has some data for, but aren’t yet complete. For example, on the session window example we may want to get an update of all running session every second without waiting for the 15 minute timeout to get the full results:
p.readFrom(KafkaSources.kafka(.., "website-events"))
.withIngestionTimestamps()
.groupingKey(event -> event.getUserId())
.window(WindowDefinition.session(TimeUnit.MINUTES.toMillis(15))
.setEarlyResultsPeriod(SECONDS.toMillis(1)))
.aggregate(AggregateOperations.counting())
.writeTo(Sinks.logger());
The output of the windowing stage is in the form of
KeyedWindowResult<String, Long>
, where String
is the word and Long
is the frequency of the events in the given window. KeyedWindowResult
also has an isEarly
property that says whether the result is early or
final.
The early results period works for all windows types. For example in a tumbling window, if you are working with a window size of one minute and there’s an additional 15-second allowed lateness for the late-coming events, this amounts to waiting up to 75 seconds from receiving a given event to getting the result it contributed to. Therefore it may be desirable to ask Hazelcast to give us updates on the current progress every second.
Generally, Hazelcast doesn’t guarantee that a stage will receive the items in the same order its upstream stage emitted them. For example, it executes a
map
transform with many parallel tasks. One task may get the early result and another one the final result. They may emit the transformed result to the sink in any order. This can lead to a situation where your sink receives an early result after it has already received the final result.
distinct
Suppresses duplicate items from a stream. This operation applies primarily to batch streams, but also works on a windowed unbounded stream.
This example takes some input of integers and outputs only the distinct values:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(0, 1, 1, 2, 3, 4, 5, 6))
.distinct()
.writeTo(Sinks.logger());
We can also use distinct
with grouping, but then two items mapping to
the same key will be duplicates. For example the following will
print only strings that have different first letters:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("joe", "john", "jenny", "maria"))
.groupingKey(s -> s.substring(0, 1))
.distinct()
.writeTo(Sinks.logger());
This is a possible output (Hazelcast can choose any of the names starting in "j"):
14:05:29.382 [ INFO] [c.h.j.i.c.W.loggerSink#0] joe
14:05:29.383 [ INFO] [c.h.j.i.c.W.loggerSink#0] maria
sort
Sorting only works on a batch stage. It sorts the input according to its
natural ordering (the type must be Comparable
) or according to the
Comparator
you provide. Example:
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("bob", "alice", "dan", "chuck"))
.sort()
.writeTo(Sinks.logger());
This is how the output should look:
10:43:54.523 [ INFO] [c.h.j.i.c.W.loggerSink#0] alice
10:43:54.524 [ INFO] [c.h.j.i.c.W.loggerSink#0] bob
10:43:54.524 [ INFO] [c.h.j.i.c.W.loggerSink#0] chuck
10:43:54.524 [ INFO] [c.h.j.i.c.W.loggerSink#0] dan
Hazelcast sorts the data in two steps: first it sorts the data that is local on each member, and then it merges the sorted streams into the final output. The first step requires O(n) heap memory, but the second step just receives the data in the correct order and has O(1) memory needs.
mapStateful
mapStateful is an extension of the simple map transform. It adds the capability to optionally retain mutable state.
The major use case of stateful mapping is recognizing a pattern in the event stream, such as matching start-transaction with end-transaction events based on an event correlation ID. More generally, you can implement any kind of state machine and detect patterns in an input of any complexity.
As with other stateful operations, you can also use a groupingKey
to
have a unique state per key.
For example, consider a pipeline that matches incoming
TRANSACTION_START
events to TRANSACTION_END
events which can arrive
unordered and when both are received outputs how long the transaction
took.
This would be difficult to express in terms of a slidingWindow
,
because we can’t know how long a transaction would take in advance, and
if it would span multiple windows. It can’t be expressed using
sessionWindow
either, because we don’t want to wait until the window
times out before emitting the results.
Let’s say we have the following class:
public class TransactionEvent {
long timestamp();
String transactionId();
EventType type();
}
public enum EventType {
TRANSACTION_START,
TRANSACTION_END
}
We can then use the following mapStateful
transform to match start
and end events:
p.readFrom(KafkaSources.kafka(.., "transaction-events"))
.withNativeTimestamps(0)
.groupingKey(event -> event.getTransactionId())
.mapStateful(MINUTES.toMillis(10),
() -> new TransactionEvent[2],
(state, id, event) -> {
if (event.type() == TRANSACTION_START) {
state[0] = event;
} else if (event.type() == TRANSACTION_END) {
state[1] = event;
}
if (state[0] != null && state[1] != null) {
// we have both start and end events
long duration = state[1].timestamp() - state[0].timestamp();
return MapUtil.entry(event.transactionId(), duration);
}
// we don't have both events, do nothing for now.
return null;
},
(state, id, currentWatermark) ->
// if we have not received both events after 10 minutes,
// we will emit a timeout entry
(state[0] == null || state[1] == null)
? MapUtil.entry(id, TIMED_OUT)
: null
).writeTo(Sinks.logger());
You will note that we also had to set an expiry time on the states
(first parameter of the mapStateful
method), otherwise would
eventually run out of memory as we accumulate more and more
transactions.
co-group / join
Co-grouping allows to join any number of inputs on a common key, which can be anything you can calculate from the input item. This makes it possible to correlate data from two or more different sources. In the same transform you are able to apply an aggregate function to all the grouped items.
As an example, we can use a sequence of events that would be typical on
a e-commerce website: PageVisit
and AddToCart
. We want to find how
many visits were required before an item was added to the cart. For
simplicity, let’s say we’re working with historical data and we are
processing this data from a set of logs.
Pipeline p = Pipeline.create();
BatchStageWithKey<PageVisit, Integer> pageVisit =
p.readFrom(Sources.files("visit-events.log"))
.groupingKey(event -> event.userId());
BatchStageWithKey<AddToCart, Integer> addToCart =
p.readFrom(Sources.files("cart-events.log"))
.groupingKey(event -> event.userId());
After getting the two keyed streams, now we can join them:
BatchStage<Entry<Integer, Tuple2<Long, Long>>> coGrouped = pageVisit
.aggregate2(counting(), addToCart, counting());
This gives an item which contains the counts for both events for the same user id. From this, it’s easy to calculate the ratio of visits vs add to cart events.
Co-grouping can also be applied to windowed streams, and works exactly
the same way as aggregate()
. An important consideration is
that the timestamps from both streams would be considered, so it’s
important that the two streams don’t have widely different timestamps.
rebalance
Hazelcast prefers not to send the data around the computing cluster.
If your data source retrieves some part of the data stream on member A
and you apply stateless mapping on it, this processing will happen on
member A. Hazelcast will send the data only when needed to achieve
correctness, for example in the case of non-parallelizable operations
like mapStateful
. Such transforms must be performed on a single
member, using a single Jet engine and all the data received on any
other member must be sent to the processing one.
The above policy results in the best throughput in most cases. However,
in some cases there is an inherent imbalance among cluster members in
terms of how much data they get from a data source. The most important
example are non-parallelized sources, where a single processor on a
single Hazelcast member receives all the data. In such a case you can apply
the rebalance
operator, which orders Hazelcast to send the data out to other
members where normally it wouldn’t choose to.
Rebalancing is best explained on the DAG level. Each pipeline stage corresponds to a vertex in the DAG, and the logic attached to the edge between them decides for each data item which processor to send it to. Some processors are on the same machine and others are on remote machines. By default, Hazelcast considers only the processors on the local machine as candidates, using a round-robin scheme to decide on the target. When you apply rebalancing, Hazelcast simply extends the candidate set to all the processors, including those on the other machines, but keeps using the same round-robin scheme. The order of the round-robin is such that the target cluster member changes every time, maximizing the fairness of the distribution across members.
Round-robin routing takes into account backpressure: if a given processor is overloaded and its input queue is full, Hazelcast tries the next one. If during rebalancing the network becomes a bottleneck, backpressure will automatically divert more traffic to the local processors.
You can also apply a less flexible kind of rebalancing, which will
enforce sending to other members even when the local ones are more
available: rebalance(keyFn)
. It uses the keyFn
you supply as a
partitioning function. In this case every item is tied to one definite
choice of the destination processor and backpressure cannot override it.
If some processor must apply backpressure, Hazelcast doesn’t try to send the
data item to another available processor and instead propagates the
backpressure to the upstream vertex. This kind of rebalancing may result
in a better balanced CPU load across the cluster, but has the potential
for less overall throughput.
peek
stage.peek()
is an identity transform that adds diagnostic
side-effects: it logs every item it receives. Since the logging happens
on the machine that is processing the item, this transform is primarily
intended to be used during development.
customTransform
All the data processing in a pipeline happens inside the
implementations of the Processor
interface, a central part of the Jet API. With stage.customTransform
you can provide your own
processor implementation that presumably does something that no
out-of-the-box processor can. If you
get involved with this transform, make sure you are familiar with the
internals of Hazelcast, as exposed through the Core
DAG API.
JSON
JSON is very frequent data exchange format. To transform the data
from/to JSON format you can use JsonUtil
utility class without adding
an extra dependency to the classpath. The utility class uses the
lightweight jackson-jr
JSON library under the hood.
For example, you can convert JSON formatted string to a bean. You need
to have your bean fields as public
or have public getters/setters and
a no-argument(default) constructor.
{
"name": "Jet",
"age": 4
}
public class Person {
public String name;
public int age;
public Person() {
}
}
BatchStage<Person> persons = stage.map(jsonString -> JsonUtil.beanFrom(jsonString, Person.class));
If you don’t have a bean class, you can use mapFrom
to convert the
JSON formatted string to a Map
.
BatchStage<Map<String, Object>> personsAsMap = stage.map(jsonString -> JsonUtil.mapFrom(jsonString));
You can also use supported annotations from Jackson Annotations in your transforms by adding it to the classpath.
compile 'com.fasterxml.jackson.core:jackson-annotations:2.11.0'
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.11.0</version>
</dependency>
For example if your bean field names differ from the JSON
string field names you can use JsonProperty
annotation for mapping.
public class Person {
@JsonProperty("name")
public String _name;
@JsonProperty("age")
public int _age;
public Person() {
}
}
See a list of supported annotations.