Entry Processor
Hazelcast supports entry processing. An entry processor is a function that executes your code on a map entry in an atomic way.
An entry processor is a good option if you perform bulk processing on an IMap
. Usually you perform a loop of keys - executing IMap.get(key)
, mutating the value and finally putting the entry back in the map using IMap.put(key,value)
. If you perform this process from a client or from a member where the keys do not exist, you effectively perform two network hops for each update: the first to retrieve the data and the second to update the mutated value.
If you are doing the process described above, you should consider using entry processors. An entry processor executes a read and updates upon the member where the data resides. This eliminates the costly network hops described above.
Entry processor is meant to process a single entry per call. Processing multiple entries and data structures in an entry processor is not supported as it may result in deadlocks. |
Note that Hazelcast Jet is a good fit when you want to perform processing that involves multiple entries (aggregations, joins, etc.), or involves multiple computing steps to be made parallel. Hazelcast Jet contains an Entry Processor Sink to allow you to update Hazelcast IMDG data as a result of your Hazelcast Jet computation. See the Hazelcast Jet documentation. |
Performing Fast In-Memory Map Operations
An entry processor enables fast in-memory operations on your map without you having to worry about locks or concurrency issues. You can apply it to a single map entry or to all map entries. Entry processors support choosing target entries using predicates. You do not need any explicit lock on entry thanks to the isolated threading model: Hazelcast runs the entry processor for all entries on a partitionThread
so there will NOT be any interleaving of the entry processor and other mutations.
Hazelcast sends the entry processor to each cluster member and these members apply it to map entries. Therefore, if you add more members, your processing completes faster.
Using Indexes
Entry processors can be used with predicates. Predicates help to process a subset of data by selecting eligible entries. This selection can happen either by doing a full-table scan or by using indexes. To accelerate entry selection step, you can consider to add indexes. If indexes are there, entry processor automatically uses them.
Using OBJECT In-Memory Format
If entry processing is the major operation for a map and if the map consists of complex objects, you should use OBJECT
as the in-memory-format
to minimize serialization cost. By default, the entry value is stored as a byte array (BINARY
format). When it is stored as an object (OBJECT
format), then the entry processor is applied directly on the object. In that case, no serialization or deserialization is performed. However, if there is a defined event listener, a new entry value will be serialized when passing to the event publisher service.
When in-memory-format is OBJECT , the old value of the updated entry will be null.
|
Processing Entries
The IMap interface provides the following methods for entry processing:
-
executeOnKey
processes an entry mapped by a key, blocking until the processing is complete and the result is returned. -
executeOnKeys
processes entries mapped by a collection of keys, blocking until the processing is complete and the results are returned. -
submitToKey
processes an entry mapped by a key and provides a way to register a callback to receive notifications about the result of the entry processing. -
executeOnEntries
processes all entries in a map, blocking until the processing is complete and the results are returned. -
executeOnEntries
also processes all entries in a map matching the provided predicate, blocking until the processing is complete and the results are returned.
When using the executeOnEntries
method, if the number of entries is high and you do not need the results, then returning null with the process()
method is a good practice. This method is offered by the EntryProcessor interface. By returning null, results of the processing are not collected and thus out of memory errors are eliminated.
If you do not need to read or modify the entry in any way but would like to execute a task on the member owning the entry with that key (i.e. the member is the partition owner for that key), you can also use executeOnKeyOwner
provided by IExecutorService. You need to make sure that the runnable can be serialized (using any of the available serialization techniques in Hazelcast). The runnable will not receive the map entry key or value and is not running on the same thread as operations reading the map data so operations such as map.get()
or map.put()
will not be blocked.
You can also use entry processors to remove entries from your map simply
by setting the value(s) of a single entry or multiple entries to null
. See the following
example code snippet:
class EntryDeletingProcessor implements EntryProcessor<String, MyData, Boolean> {
public Boolean process(Map.Entry<String, MyData> entry) {
entry.setValue(null);
return true;
}
}
Related to above, IMap’s executeOnEntries()
method accepts predicates; you can also
remove entries that match to a predicate that you provide.
Entry processors run via operation threads that are dedicated to specific partitions. Therefore, with long running entry processor executions, other partition operations such as map.put(key) on some partitions can be blocked while partition operations on other partitions might run concurrently. With this in mind, it is a good practice to make your entry processor executions as quick as possible.
|
Respecting Locks on Single Keys
The entry processor respects locks ONLY when its executions are performed on a single key. As explained in the above section, the entry processor has the following methods to process a single key:
Object executeOnKey(K key, EntryProcessor entryProcessor);
ICompletableFuture submitToKey(K key, EntryProcessor entryProcessor);
Therefore, if you want to to perform an entry processor execution on a single key using one of these methods and that key has a lock on it, the execution will wait until the lock on that key is removed.
Processing Backup Entries
If your code modifies the data, then you should also provide a processor for backup entries. This is required to prevent the primary map entries from having different values than the backups because it causes the entry processor to be applied both on the primary and backup entries. The interface EntryBackupProcessor offers the method processBackup
for this purpose.
It is possible that an entry processor could see that a key exists though its backup processor may not find it at the run time due to an unsent backup of a previous operation, e.g., a previous put operation. In those situations, Hazelcast internally/eventually synchronizes those owner and backup partitions so you do not lose any data. When coding an EntryBackupProcessor , you should take that case into account, otherwise NullPointerException can be seen since Map.Entry.getValue() may return null .
|
Creating an Entry Processor
The class IncrementingEntryProcessor
creates an entry processor to process the map
entries. It implements:
-
the map interfaces
EntryProcessor
andEntryBackupProcessor
-
java.io.Serializable
interface -
EntryProcessor
methodsprocess
andgetBackupProcessor
-
EntryBackupProcessor
methodprocessBackup
.
public class IncrementingEntryProcessor
implements EntryProcessor<Integer, Integer>, EntryBackupProcessor<Integer, Integer>, Serializable {
public Object process( Map.Entry<Integer, Integer> entry ) {
Integer value = entry.getValue();
entry.setValue( value + 1 );
return value + 1;
}
public EntryBackupProcessor<Integer, Integer> getBackupProcessor() {
return IncrementingEntryProcessor.this;
}
public void processBackup( Map.Entry<Integer, Integer> entry ) {
entry.setValue( entry.getValue() + 1 );
}
}
An example usage is shown below:
IMap<Integer, Integer> map = hazelcastInstance.getMap( "myMap" );
for ( int i = 0; i < 100; i++ ) {
map.put( i, i );
}
Map<Integer, Object> res = map.executeOnEntries( new IncrementingEntryProcessor() );
You should explicitly call the setValue method of Map.Entry when modifying data in the entry processor. Otherwise, the entry processor is accepted as read-only.
|
An entry processor instance is not thread-safe. If you are storing a partition specific state between invocations, be sure to register this in a thread-local. An entry processor instance can be used by multiple partition threads. |
Abstract Entry Processor
You can use the AbstractEntryProcessor
class when the same processing will be performed both on the primary and backup map entries, i.e., the same logic applies to them. If you use entry processor, you need to apply the same logic to the backup entries separately. The AbstractEntryProcessor
class makes this primary/backup processing easier.
You can use the AbstractEntryProcessor
class to create your own abstract entry processor. The method getBackupProcessor
in this class returns an EntryBackupProcessor
instance. This means the same processing applies to both the primary and backup entries. If you want to apply the processing only upon the primary entries, make the getBackupProcessor
method return null.
Beware of the null issue described above. Due to a yet unsent backup from a previous operation, an EntryBackupProcessor may temporarily receive null from Map.Entry.getValue() even though the value actually exists in the map. If you decide to use AbstractEntryProcessor , make sure your code logic is not sensitive to null values, or you may encounter NullPointerException during runtime.
|
Entry Processor Performance Optimizations
By default the entry processor executes on a partition thread. A partition thread is responsible for handling
one or more partitions. The design of entry processor assumes users have fast user code execution of the process()
method.
In the pathological case where the code is very heavy and executes in multi-milliseconds, this may create a bottleneck.
We have a slow user code detector which can be used to log a warning controlled by the following system properties:
-
hazelcast.slow.operation.detector.enabled
(default: true) -
hazelcast.slow.operation.detector.threshold.millis
(default: 10000)
The defaults catch extremely slow operations but you should set this much lower, say to 1ms, at development time to catch entry processors that could be problematic in production. These are good candidates for our optimizations.
We have two optimizations:
-
Offloadable
which moves execution off the partition thread to an executor thread -
ReadOnly
which means we can avoid taking a lock on the key
These are enabled very simply by implementing these interfaces in your EntryProcessor
.
As of Hazelcast IMDG 3.9, these optimizations apply to the following IMap methods only:
-
executeOnKey(Object, EntryProcessor)
-
submitToKey(Object, EntryProcessor)
-
submitToKey(Object, EntryProcessor, ExecutionCallback)
Offloadable Entry Processor
If an entry processor implements the Offloadable
interface, the process()
method is executed in the executor
specified by the Offloadable
's getExecutorName()
method.
Offloading unblocks the partition thread allowing the user to profit from much higher throughput. The key is locked for the time span of the processing in order to not generate a write conflict.
In this case the threading looks as follows:
-
partition thread (fetch entry & lock key)
-
execution thread (process(entry) method)
-
partition thread (set new value & unlock key, or just unlock key if the entry has not been modified)
The method getExecutorName()
method may also return two constants defined in the Offloadable interface:
-
NO_OFFLOADING: Processing is not offloaded if the method
getExecutorName()
returns this constant; it is executed as if it does not implement theOffloadable
interface. -
OFFLOADABLE_EXECUTOR: Processing is offloaded to the default
ExecutionService.OFFLOADABLE_EXECUTOR
.
Note that if the method getExecutorName()
cannot find an executor whose name matches the one called by this method, then the default executor service is used. Here is the configuration for the "default" executor:
<hazelcast>
...
<executor-service name="default">
<pool-size>16</pool-size>
<queue-capacity>0</queue-capacity>
</executor-service>
...
</hazelcast>
hazelcast:
...
executor-service:
default:
pool-size: 16
queue-capacity: 0
An example of an Offloadable called "OffloadedInventoryEntryProcessor" would be as follows:
<hazelcast>
...
<executor-service name="OffloadedInventoryEntryProcessor">
<pool-size>30</pool-size>
<queue-capacity>0</queue-capacity>
</executor-service>
...
</hazelcast>
hazelcast:
...
executor-service:
OffloadedInventoryEntryProcessor:
pool-size: 30
queue-capacity: 0
Remember to set the pool-size
(count of executor threads per member) according to your execution needs. See the Configuring Executor Service section for the configuration details.
ReadOnly Entry Processor
By default, an entry processor does not run if the key is locked.
It waits until the key has been unlocked (it applies to the executeOnKey
, submitToKey
methods, that were mentioned before).
If the entry processor implements the ReadOnly
interface without implementing the Offloadable
interface, the processing is not offloaded to an external executor. However, the entry processor does not observe if the key of the processed entry is
locked, nor tries to acquire the lock since the entry processor will not do any modifications.
If the entry processor implements ReadOnly
and modifies the entry, an UnsupportedOperationException
is thrown.
ReadOnly and Offloadable Entry Processor
If the entry processor implements both ReadOnly
and Offloadable
interfaces, we observe the combination of both
optimizations described above.
The process()
method is executed in the executor specified by the Offloadable’s `getExecutorName()
method.
Also, the entry processor does not observe if the key of the processed entry is locked, nor tries to acquire the
lock since the entry processor will not do any modifications.
In this case the threading looks as follows:
-
partition thread (fetch entry)
-
execution thread (process(entry))
In this case the EntryProcessor.getBackupProcessor()
has to return null; otherwise an IllegalArgumentException
exception is thrown.
If the entry processor implements ReadOnly
and modifies the entry, an UnsupportedOperationException
is thrown.
Putting it all together:
public class OffloadableReadOnlyEntryProcessor implements EntryProcessor<String, Employee>,
Offloadable, ReadOnly {
@Override
public Object process(Map.Entry<String, Employee> entry) {
// heavy logic
return null;
}
@Override
public EntryBackupProcessor<String, Employee> getBackupProcessor() {
// ReadOnly EntryProcessor has to return null, since it's just a read-only operation that will not be
// executed on the backup
return null;
}
@Override
public String getExecutorName() {
return OFFLOADABLE_EXECUTOR;
}
}