Entry Processor
An entry processor is a function that executes your code on a map entry in an atomic way.
An entry processor is a good option if you perform bulk processing on an IMap
. Usually you perform a loop of keys - executing IMap.get(key)
, mutating the value and finally putting the entry back in the map using IMap.put(key,value)
. If you perform this process from a client or from a member where the keys do not exist, you effectively perform two network hops for each update: the first to retrieve the data and the second to update the mutated value.
If you are doing the process described above, you should consider using entry processors. An entry processor executes a read and updates upon the member where the data resides. This eliminates the costly network hops described above.
The entry processor is meant to process a single entry per call. Processing multiple entries and data structures in an entry processor is not supported as it may result in deadlocks. To process multiple entries, use a data pipeline with the entry processor sink. |
Performing Fast In-Memory Map Operations
An entry processor enables fast in-memory operations on your map without you having to worry about locks or concurrency issues. You can apply it to a single map entry or to all map entries. Entry processors support choosing target entries using predicates. You do not need any explicit lock entries thanks to the isolated threading model: Hazelcast runs the entry processor for all entries on a partition thread so it does not interleave with other mutations.
Hazelcast sends the entry processor to each cluster member and these members apply it to map entries. Therefore, if you add more members, your processing completes faster.
Using Indexes
Entry processors can be used with predicates. Predicates help to process a subset of data by selecting eligible entries. This selection can happen either by doing a full-table scan or by using indexes. To accelerate entry selection step, you can consider to add indexes. If indexes are there, entry processor automatically uses them.
Using OBJECT In-Memory Format
If entry processing is the major operation for a map and if the map consists of complex objects, you should use OBJECT
as the in-memory-format
to minimize serialization cost. By default, the entry value is stored as a byte array (BINARY
format). When it is stored as an object (OBJECT
format), then the entry processor is applied directly on the object. In that case, no serialization or deserialization is performed. However, if there is a defined event listener, a new entry value will be serialized when passing to the event publisher service.
When in-memory-format is OBJECT , the old value of the updated entry will be null.
|
Processing Entries
The IMap interface provides the following methods for entry processing:
-
executeOnKey
processes an entry mapped by a key, blocking until the processing is complete and the result is returned. -
executeOnKeys
processes entries mapped by a collection of keys, blocking until the processing is complete and the results are returned. -
submitToKey
processes an entry mapped by a key and provides a way to register a callback to receive notifications about the result of the entry processing. -
executeOnEntries
processes all entries in a map, blocking until the processing is complete and the results are returned. -
executeOnEntries
also processes all entries in a map matching the provided predicate, blocking until the processing is complete and the results are returned.
When using the executeOnEntries
method, if the number of entries is high and you do not need the results, then returning null with the process()
method is a good practice. This method is offered by the EntryProcessor interface. By returning null, results of the processing are not collected and thus out of memory errors are eliminated.
If you do not need to read or modify the entry in any way but would like to execute a task on the member owning the entry with that key (i.e. the member is the partition owner for that key), you can also use executeOnKeyOwner
provided by IExecutorService. You need to make sure that the runnable can be serialized (using any of the available serialization techniques in Hazelcast). The runnable will not receive the map entry key or value and is not running on the same thread as operations reading the map data so operations such as map.get()
or map.put()
will not be blocked.
You can also use entry processors to remove entries from your map simply
by setting the value(s) of a single entry or multiple entries to null
. See the following
example code snippet:
class EntryDeletingProcessor implements EntryProcessor<String, MyData, Boolean> {
public Boolean process(Map.Entry<String, MyData> entry) {
entry.setValue(null);
return true;
}
}
Related to above, IMap’s executeOnEntries()
method accepts predicates; you can also
remove entries that match to a predicate that you provide.
Entry processors run via operation threads that are dedicated to specific partitions. Therefore, with long-running entry processor executions, other partition operations such as map.put(key) on some partitions can be blocked while partition operations on other partitions might run concurrently. With this in mind, it is a good practice to make your entry processor executions as quick as possible.
|
Respecting Locks on Single Keys
The entry processor respects locks ONLY when its executions are performed on a single key. As explained in the above section, the entry processor has the following methods to process a single key:
<R> R executeOnKey(K key, EntryProcessor<K, V, R> entryProcessor);
<R> CompletionStage<R> submitToKey(K key, EntryProcessor<K, V, R> entryProcessor);
Therefore, if you want to perform an entry processor execution on a single key using one of these methods and that key has a lock on it, the execution will wait until the lock on that key is removed.
Processing Backup Entries
If your code modifies the data, then you will most likely need to modify backup entries as well. This should be done to prevent divergence of map values between copies of data in the cluster (the primary and backup replicas). In most cases, this is simple. By implementing the EntryProcessor
interface and providing only the process()
method, the same entry processor will be applied on all copies of the map entry.
If, however, you would like to run a custom processor on backup entries, you may provide the processor by overriding the EntryProcessor#getBackupProcessor
method. The method should return an instance of an EntryProcessor
which will be run on backup entries exclusively. As such, it may carry some state that was derived from running the entry processor on primary replicas.
You may also return null
from the EntryProcessor#getBackupProcessor
method. This signifies that there is nothing to be done on the backup replicas which is most convenient when you are using the entry processor to read and not modify entries.
It is possible that an entry processor could see that a key exists though its backup processor may not find it due to an unsent backup of a previous operation, e.g., a previous put operation. In those situations, Hazelcast internally/eventually synchronizes those owner and backup partitions so you do not lose any data. When coding a backup entry processor, you should take that case into account, otherwise NullPointerException can be seen since Map.Entry.getValue() may return null .
|
Updating Entry TTLs
You can update the time-to-live (TTL) duration and value of existing map entries in your entry processor application, using the ExtendedMapEntry
interface.
The following is an example snippet where the entries are incremented by one and provided with a new TTL.
ExtendedMapEntry<Integer, Integer> entry = (ExtendedMapEntry<Integer, Integer>) e;
int newValue = entry.getValue() + 1;
if (ttlSeconds > 0) {
// set TTL to 10 seconds
entry.setValue(newValue, 10, TimeUnit.SECONDS);
The interface also provides the ability to update the entry value without changing the entry’s TTL. See the interface documentation for method descriptions.
Creating an Entry Processor
The class IncrementingEntryProcessor
creates an entry processor to process the map
entries. It implements the EntryProcessor
interface. The process()
method will be called for both primary and backup entries.
public class IncrementingEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
public Integer process( Map.Entry<Integer, Integer> entry ) {
Integer value = entry.getValue();
entry.setValue( value + 1 );
return value + 1;
}
@Override
public EntryProcessor<Integer, Integer, Integer> getBackupProcessor() {
return IncrementingEntryProcessor.this;
}
}
An example usage is shown below:
IMap<Integer, Integer> map = hazelcastInstance.getMap( "myMap" );
for ( int i = 0; i < 100; i++ ) {
map.put( i, i );
}
Map<Integer, Object> res = map.executeOnEntries( new IncrementingEntryProcessor() );
You should explicitly call the setValue method of Map.Entry when modifying data in the entry processor. Otherwise, the entry processor is accepted as read-only.
|
An entry processor instance is not thread-safe. If you are storing a partition specific state between invocations, be sure to register this in a thread-local. An entry processor instance can be used by multiple partition threads. |
Entry Processor Performance Optimizations
By default, the entry processor executes on a partition thread. A partition thread is responsible for handling
one or more partitions. The design of entry processor assumes users have fast user code execution of the process()
method.
In the pathological case where the code is very heavy and executes in multi-milliseconds, this may create a bottleneck.
We have a slow user code detector which can be used to log a warning controlled by the following system properties:
-
hazelcast.slow.operation.detector.enabled
(default: true) -
hazelcast.slow.operation.detector.threshold.millis
(default: 10000)
User Code Deployment has been deprecated and will be removed in the next major version. To continue deploying your user code after this time, Community Edition users can either upgrade to Enterprise Edition, or add their resources to the Hazelcast member class paths. Hazelcast recommends that Enterprise Edition users migrate their user code to use User Code Namespaces for all purposes other than Jet stream processing. For further information on migrating from User Code Deployment to User Code Namespaces, see Migrate from User Code Deployment. |
The defaults catch extremely slow operations but you should set this much lower, say to 1ms, at development time to catch entry processors that could be problematic in production. These are good candidates for our optimizations.
We have two optimizations:
-
Offloadable
which moves execution off the partition thread to an executor thread -
ReadOnly
which means we can avoid taking a lock on the key
These are enabled very simply by implementing these interfaces in your EntryProcessor
.
These optimizations apply to the following IMap methods only:
-
executeOnKey(Object, EntryProcessor)
-
submitToKey(Object, EntryProcessor)
-
submitToKey(Object, EntryProcessor, ExecutionCallback)
Offloadable Entry Processor
If an entry processor implements the Offloadable
interface, the process()
method is executed in the executor
specified by the Offloadable
's getExecutorName()
method.
Offloading unblocks the partition thread allowing the user to profit from much higher throughput. The key is locked for the time span of the processing in order to not generate a write conflict.
In this case the threading looks as follows:
-
partition thread (fetch entry & lock key)
-
execution thread (process(entry) method)
-
partition thread (set new value & unlock key, or just unlock key if the entry has not been modified)
The method getExecutorName()
method may also return two constants defined in the Offloadable interface:
-
NO_OFFLOADING
: Processing is not offloaded if the methodgetExecutorName()
returns this constant; it is executed as if it does not implement theOffloadable
interface. -
OFFLOADABLE_EXECUTOR
: Processing is offloaded to the defaultExecutionService.OFFLOADABLE_EXECUTOR
.
Note that if the method getExecutorName()
cannot find an executor whose name matches the one called by this method, then the default executor service is used. Here is the configuration for the "default" executor:
<hazelcast>
...
<executor-service name="default">
<pool-size>16</pool-size>
<queue-capacity>0</queue-capacity>
</executor-service>
...
</hazelcast>
hazelcast:
...
executor-service:
default:
pool-size: 16
queue-capacity: 0
An example of an Offloadable called "OffloadedInventoryEntryProcessor" would be as follows:
<hazelcast>
...
<executor-service name="OffloadedInventoryEntryProcessor">
<pool-size>30</pool-size>
<queue-capacity>0</queue-capacity>
</executor-service>
...
</hazelcast>
hazelcast:
...
executor-service:
OffloadedInventoryEntryProcessor:
pool-size: 30
queue-capacity: 0
Remember to set the pool-size
(count of executor threads per member) according to your execution needs. See the Configuring Executor Service section for the configuration details.
ReadOnly Entry Processor
By default, an entry processor does not run if the key is locked.
It waits until the key has been unlocked (it applies to the executeOnKey
, submitToKey
methods, that were mentioned before).
If the entry processor implements the ReadOnly
interface without implementing the Offloadable
interface, the processing is not offloaded to an external executor. However, the entry processor does not observe if the key of the processed entry is
locked, nor tries to acquire the lock since the entry processor will not do any modifications.
If the entry processor implements ReadOnly
and modifies the entry, an UnsupportedOperationException
is thrown.
ReadOnly and Offloadable Entry Processor
If the entry processor implements both ReadOnly
and Offloadable
interfaces, we observe the combination of both
optimizations described above.
The process()
method is executed in the executor specified by the Offloadable
's getExecutorName()
method.
Also, the entry processor does not observe if the key of the processed entry is locked, nor tries to acquire the
lock since the entry processor will not do any modifications.
In this case the threading looks as follows:
-
partition thread (fetch entry)
-
execution thread (process(entry))
In this case the EntryProcessor.getBackupProcessor()
has to return null; otherwise an IllegalArgumentException
exception is thrown.
If the entry processor implements ReadOnly
and modifies the entry, an UnsupportedOperationException
is thrown.
Putting it all together:
public class OffloadableReadOnlyEntryProcessor implements EntryProcessor<String, Employee, Object>,
Offloadable, ReadOnly {
@Override
public Object process(Map.Entry<String, Employee> entry) {
// heavy logic
return null;
}
@Override
public EntryProcessor<String, Employee, Object> getBackupProcessor() {
// ReadOnly EntryProcessor has to return null, since it's just a read-only operation that will not be
// executed on the backup
return null;
}
@Override
public String getExecutorName() {
return OFFLOADABLE_EXECUTOR;
}
}