Map Connector
A map is a distributed in-memory key-value data structure that can be used as a batch or streaming data source as well as a data sink. See Distributed Map.
Installing the Connector
This connector is included in the full and slim distributions of Hazelcast.
Permissions
Enterprise Edition
If security is enabled, you can set up permissions to restrict clients' access to these data structures.
For example, to read from map sources, you must add the create
and read
permissions for those maps. If you use the map connector to write to map sinks, you must add the create
and put
permissions for those maps.
For details, see Client Authorization.
Map as a Batch Source
Use maps as a batch source to query their keys and values or use them as part of a data pipeline.
As a batch data source, it’s very easy to use without the need for any other configuration:
IMap<String, User> userCache = hz.getMap("usersCache")
Pipeline p = Pipeline.create();
p.readFrom(Sources.map(userCache));
.writeTo(Sinks.logger()));
Map as a Streaming Source
Maps can also be used as a streaming data source by enabling the event journal.
Streaming data from maps is fault-tolerant and supports exactly-once processing.
The journal for a map is by default not enabled, but can be enabled with the following configuration option:
<hazelcast>
...
<map name="default">
<event-journal enabled="true">
<capacity>5000</capacity>
<time-to-live-seconds>20</time-to-live-seconds>
</event-journal>
</map>
...
<cache name="*">
<event-journal enabled="true">
<capacity>10000</capacity>
<time-to-live-seconds>0</time-to-live-seconds>
</event-journal>
</cache>
...
</hazelcast>
hazelcast:
map:
default:
event-journal:
enabled: true
capacity: 5000
time-to-live-seconds: 20
cache:
"*":
event-journal:
enabled: true
capacity: 10000
time-to-live-seconds: 0
EventJournalConfig eventJournalMapConfig = new EventJournalConfig()
.setEnabled(true)
.setCapacity(5000)
.setTimeToLiveSeconds(20);
EventJournalConfig eventJournalCacheConfig = new EventJournalConfig()
.setEnabled(true)
.setCapacity(10000)
.setTimeToLiveSeconds(0);
Config config = new Config();
config.getMapConfig("myMap").setEventJournalConfig(eventJournalMapConfig);
config.getCacheConfig("myCache").setEventJournalConfig(eventJournalCacheConfig);
We can then modify the previous pipeline to instead stream the changes:
IMap<String, User> userCache = hz.getMap("usersCache")
Pipeline p = Pipeline.create();
p.readFrom(Sources.mapJournal(userCache, START_FROM_OLDEST))
.withIngestionTimestamps()
.writeTo(Sinks.logger()));
By default, the source will emit only ADDED
or UPDATED
events and
the emitted object will have the key and the new value. You can change
to listen for all events by adding additional parameters to the source.
The capacity of the event journal is also an important consideration, as
having too little capacity will cause events to be dropped. Consider
also that the capacity is for all partitions and not shared per partition.
For example, if there are many updates to just one key, with the default
partition count of 271
and journal size of 100,000
the journal only
has space for 370
events per partitions.
For a tutorial, see the Subscribe to Changes to a Map.
Map as a Sink
By default, the map sink expects items of type Entry<Key, Value>
and
will simply replace the previous entries, if any. However, there are
variants of the map sink that allow you to do atomic updates to existing
entries in the map by making use of EntryProcessor
objects.
The updating sinks come in three variants:
-
mapWithMerging
, where you provide a function that computes the map value from the stream item and a merging function that gets called only if a value already exists in the map. This is similar to the way standardMap.merge
method behaves. Here’s an example that concatenates String values:Pipeline p = Pipeline.create(); p.readFrom(Sources.<String, User>map("userCache")) .map(user -> entry(user.country(), user)) .writeTo(Sinks.mapWithMerging("usersByCountry", e -> e.getKey(), e -> e.getValue().name(), (oldValue, newValue) -> oldValue + ", " + newValue) );
-
mapWithUpdating
, where you provide a single updating function that always gets called. It will be called on the stream item and the existing value, if any. This can be used to add details to an existing object for example. This is similar to the way standardMap.compute
method behaves. Here’s an example that only updates a field:Pipeline p = Pipeline.create(); p.readFrom(Sources.<String, User>map("userCacheDetails")) .writeTo(Sinks.mapWithUpdating("userCache", e -> e.getKey(), (oldValue, entry) -> (oldValue != null ? oldValue.setDetails(entry.getValue) : null) ))
-
mapWithEntryProcessor
, where you provide a function that returns a full-blownEntryProcessor
instance that will be submitted to the map. This is the most general variant. This example takes the values of the map and submits an entry processor that increments the values by 5:Pipeline p = Pipeline.create(); p.readFrom(Sources.<String, Integer>map("input")) .writeTo(Sinks.mapWithEntryProcessor("output", entry -> entry.getKey(), entry -> new IncrementEntryProcessor()) ); static class IncrementEntryProcessor implements EntryProcessor<String, Integer, Integer> { @Override public Integer process(Entry<String, Integer> entry) { return entry.setValue(entry.getValue() + 5); } }
The variants above can be used to remove existing map entries by
setting their values to null
. To put it another way, if these map sink
variants set the entry’s value to null, the entry will be removed
from the map.
Custom classes, such as an EntryProcessor , must be available to all members
of the cluster. This can be done by adding the classes to each member’s classpath
directly, or by using User Code Namespaces. If using User Code Namespaces, ensure that the Job and underlying IMap
are both associated with a namespace that contains the same EntryProcessor implementation.
|
Predicates and Projections
If your use case calls for some filtering and/or transformation of the data you retrieve, you can optimize the pipeline by providing a filtering predicate and an arbitrary transformation function to the source connector itself and they’ll get applied before the data is processed by Hazelcast. This can be advantageous especially in the cases when the data source is in another cluster. See the example below:
IMap<String, Person> personCache = jet.getMap("personCache");
Pipeline p = Pipeline.create();
p.readFrom(Sources.map(personCache,
Predicates.greaterEqual("age", 21),
Projections.singleAttribute("name"))
);