Map Connector

A map is a distributed in-memory key-value data structure that can be used as a batch or streaming data source as well as a data sink. See Distributed Map.

Installing the Connector

This connector is included in the full and slim distributions of Hazelcast.

Permissions

Enterprise Edition

If security is enabled, you can set up permissions to restrict clients' access to these data structures.

For example, to read from map sources, you must add the create and read permissions for those maps. If you use the map connector to write to map sinks, you must add the create and put permissions for those maps.

For details, see Client Security.

Map as a Batch Source

Use maps as a batch source to query their keys and values or use them as part of a data pipeline.

As a batch data source, it’s very easy to use without the need for any other configuration:

IMap<String, User> userCache = hz.getMap("usersCache")
Pipeline p = Pipeline.create();
p.readFrom(Sources.map(userCache));
 .writeTo(Sinks.logger()));

Map as a Streaming Source

Maps can also be used as a streaming data source by enabling the event journal.

Streaming data from maps is fault-tolerant and supports exactly-once processing.

The journal for a map is by default not enabled, but can be enabled with the following configuration option:

  • XML

  • YAML

  • Java

<hazelcast>
    ...
    <map name="default">
        <event-journal enabled="true">
            <capacity>5000</capacity>
            <time-to-live-seconds>20</time-to-live-seconds>
        </event-journal>
    </map>
    ...
    <cache name="*">
        <event-journal enabled="true">
            <capacity>10000</capacity>
            <time-to-live-seconds>0</time-to-live-seconds>
        </event-journal>
    </cache>
    ...
</hazelcast>
hazelcast:
  map:
    default:
      event-journal:
        enabled: true
        capacity: 5000
        time-to-live-seconds: 20
  cache:
    "*":
      event-journal:
        enabled: true
        capacity: 10000
        time-to-live-seconds: 0
        EventJournalConfig eventJournalMapConfig = new EventJournalConfig()
                .setEnabled(true)
                .setCapacity(5000)
                .setTimeToLiveSeconds(20);

        EventJournalConfig eventJournalCacheConfig = new EventJournalConfig()
                .setEnabled(true)
                .setCapacity(10000)
                .setTimeToLiveSeconds(0);

        Config config = new Config();
        config.getMapConfig("myMap").setEventJournalConfig(eventJournalMapConfig);
        config.getCacheConfig("myCache").setEventJournalConfig(eventJournalCacheConfig);

We can then modify the previous pipeline to instead stream the changes:

IMap<String, User> userCache = hz.getMap("usersCache")
Pipeline p = Pipeline.create();
p.readFrom(Sources.mapJournal(userCache, START_FROM_OLDEST))
 .withIngestionTimestamps()
 .writeTo(Sinks.logger()));

By default, the source will emit only ADDED or UPDATED events and the emitted object will have the key and the new value. You can change to listen for all events by adding additional parameters to the source.

The capacity of the event journal is also an important consideration, as having too little capacity will cause events to be dropped. Consider also that the capacity is for all partitions and not shared per partition. For example, if there are many updates to just one key, with the default partition count of 271 and journal size of 100,000 the journal only has space for 370 events per partitions.

For a tutorial, see the Subscribe to Changes to a Map.

Map as a Sink

By default, the map sink expects items of type Entry<Key, Value> and will simply replace the previous entries, if any. However, there are variants of the map sink that allow you to do atomic updates to existing entries in the map by making use of EntryProcessor objects.

The updating sinks come in three variants:

  • mapWithMerging, where you provide a function that computes the map value from the stream item and a merging function that gets called only if a value already exists in the map. This is similar to the way standard Map.merge method behaves. Here’s an example that concatenates String values:

    Pipeline p = Pipeline.create();
    p.readFrom(Sources.<String, User>map("userCache"))
     .map(user -> entry(user.country(), user))
     .writeTo(Sinks.mapWithMerging("usersByCountry",
        e -> e.getKey(),
        e -> e.getValue().name(),
        (oldValue, newValue) -> oldValue + ", " + newValue)
      );
  • mapWithUpdating, where you provide a single updating function that always gets called. It will be called on the stream item and the existing value, if any. This can be used to add details to an existing object for example. This is similar to the way standard Map.compute method behaves. Here’s an example that only updates a field:

    Pipeline p = Pipeline.create();
    p.readFrom(Sources.<String, User>map("userCacheDetails"))
     .writeTo(Sinks.mapWithUpdating("userCache",
        e -> e.getKey(),
        (oldValue, entry) -> (oldValue != null ? oldValue.setDetails(entry.getValue) : null)
      ))
  • mapWithEntryProcessor, where you provide a function that returns a full-blown EntryProcessor instance that will be submitted to the map. This is the most general variant. This example takes the values of the map and submits an entry processor that increments the values by 5:

    Pipeline p = Pipeline.create();
    p.readFrom(Sources.<String, Integer>map("input"))
     .writeTo(Sinks.mapWithEntryProcessor("output",
        entry -> entry.getKey(),
        entry -> new IncrementEntryProcessor())
      );
    
    static class IncrementEntryProcessor implements EntryProcessor<String, Integer, Integer> {
        @Override
        public Integer process(Entry<String, Integer> entry) {
            return entry.setValue(entry.getValue() + 5);
        }
    }

The variants above can be used to remove existing map entries by setting their values to null. To put it another way, if these map sink variants set the entry’s value to null, the entry will be removed from the map.

Predicates and Projections

If your use case calls for some filtering and/or transformation of the data you retrieve, you can optimize the pipeline by providing a filtering predicate and an arbitrary transformation function to the source connector itself and they’ll get applied before the data is processed by Hazelcast. This can be advantageous especially in the cases when the data source is in another cluster. See the example below:

IMap<String, Person> personCache = jet.getMap("personCache");
Pipeline p = Pipeline.create();
p.readFrom(Sources.map(personCache,
    Predicates.greaterEqual("age", 21),
    Projections.singleAttribute("name"))
);