Map Transforms

You can use IMap to perform transformations of the items in the pipeline, for example to lookup dictionary values or enrich the data. The simplest mapUsingIMap transformation is available in the stage API itself.

IMapExtension (Enterprise Edition)

More advanced transformations are provided in IMapExtension:

  • mapUsingPutIfAbsent: performs putIfAbsent and provides previous value to custom mapping function

All these transformations provide AT_LEAST_ONCE processing guarantees. They are not transactional, but rely on sources to retry events in case of failure.

It is possible to customize some parameters of the transformation:

  • maxConcurrentOps - sets maximum number of concurrent async operations

  • doNotPreserveOrder - disables preservation of ordering of the input items

These parameters must be configured before mapUsingPutIfAbsent is invoked. Some standard stage parameters can also be configured for the created stage:

  • setName - more meaningful stage name

  • setLocalParallelism - local parallelism of the stage

The following example records first occurrence of a key from given bucket and detects subsequent occurrences allowing to react differently to them.

import static com.hazelcast.jet.pipeline.IMapExtension.iMapExtension;

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3, 11, 12, 13, 21, 22, 23))
    // rebalance is used to guarantee order of items mapping to the same IMap key
    .rebalance(e -> e % 10)
    .using(iMapExtension())
    .mapUsingPutIfAbsent("some-map",
            e -> e % 10,
            e -> e,
            Tuple3::tuple3)
    .writeTo(Sinks.logger());

It should produce output similar as below. The tuple contains in order: item, previous IMap value, value attempted to be inserted in putIfAbsent. Notice that some entries appear in different order due to parallelism, but the invariant is preserved - only first occurrence updates the IMap.

[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (2, null, 2)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (3, null, 3)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (12, 2, 12)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (13, 3, 13)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (22, 2, 22)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (23, 3, 23)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (1, null, 1)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (11, 1, 11)
[127.0.0.1]:5702 [dev] [0ee0-e20c-9700-0001/loggerSink#0] (21, 1, 21)

Final IMap contents: [1=1, 2=2, 3=3]

The following example demonstrates how to use all available settings:

import static com.hazelcast.jet.pipeline.IMapExtension.iMapExtension;

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3, 11, 12, 13, 21, 22, 23))
    .rebalance(e -> e % 10)
    .using(iMapExtension())
    .doNotPreserveOrder()
    .maxConcurrentOps(32)
    .mapUsingPutIfAbsent("some-map",
            e -> e % 10,
            e -> e,
            Tuple3::tuple3)
    .setName("store-first-by-bucket")
    .setLocalParallelism(8)
    .writeTo(Sinks.logger());