Entry Processor

An entry processor is a function that executes your code on a map entry in an atomic way.

An entry processor is a good option if you perform bulk processing on an IMap. Usually you perform a loop of keys - executing IMap.get(key), mutating the value and finally putting the entry back in the map using IMap.put(key,value). If you perform this process from a client or from a member where the keys do not exist, you effectively perform two network hops for each update: the first to retrieve the data and the second to update the mutated value.

If you are doing the process described above, you should consider using entry processors. An entry processor executes a read and updates upon the member where the data resides. This eliminates the costly network hops described above.

The entry processor is meant to process a single entry per call. Processing multiple entries and data structures in an entry processor is not supported as it may result in deadlocks. To process multiple entries, use a data pipeline with the entry processor sink.

Performing Fast In-Memory Map Operations

An entry processor enables fast in-memory operations on your map without you having to worry about locks or concurrency issues. You can apply it to a single map entry or to all map entries. Entry processors support choosing target entries using predicates. You do not need any explicit lock entries thanks to the isolated threading model: Hazelcast runs the entry processor for all entries on a partition thread so it does not interleave with other mutations.

Hazelcast sends the entry processor to each cluster member and these members apply it to map entries. Therefore, if you add more members, your processing completes faster.

Using Indexes

Entry processors can be used with predicates. Predicates help to process a subset of data by selecting eligible entries. This selection can happen either by doing a full-table scan or by using indexes. To accelerate entry selection step, you can consider to add indexes. If indexes are there, entry processor automatically uses them.

Using OBJECT In-Memory Format

If entry processing is the major operation for a map and if the map consists of complex objects, you should use OBJECT as the in-memory-format to minimize serialization cost. By default, the entry value is stored as a byte array (BINARY format). When it is stored as an object (OBJECT format), then the entry processor is applied directly on the object. In that case, no serialization or deserialization is performed. However, if there is a defined event listener, a new entry value will be serialized when passing to the event publisher service.

When in-memory-format is OBJECT, the old value of the updated entry will be null.

Processing Entries

The IMap interface provides the following methods for entry processing:

  • executeOnKey processes an entry mapped by a key, blocking until the processing is complete and the result is returned.

  • executeOnKeys processes entries mapped by a collection of keys, blocking until the processing is complete and the results are returned.

  • submitToKey processes an entry mapped by a key and provides a way to register a callback to receive notifications about the result of the entry processing.

  • executeOnEntries processes all entries in a map, blocking until the processing is complete and the results are returned.

  • executeOnEntries also processes all entries in a map matching the provided predicate, blocking until the processing is complete and the results are returned.

When using the executeOnEntries method, if the number of entries is high and you do not need the results, then returning null with the process() method is a good practice. This method is offered by the EntryProcessor interface. By returning null, results of the processing are not collected and thus out of memory errors are eliminated.

If you do not need to read or modify the entry in any way but would like to execute a task on the member owning the entry with that key (i.e. the member is the partition owner for that key), you can also use executeOnKeyOwner provided by IExecutorService. You need to make sure that the runnable can be serialized (using any of the available serialization techniques in Hazelcast). The runnable will not receive the map entry key or value and is not running on the same thread as operations reading the map data so operations such as map.get() or map.put() will not be blocked.

You can also use entry processors to remove entries from your map simply by setting the value(s) of a single entry or multiple entries to null. See the following example code snippet:

class EntryDeletingProcessor implements EntryProcessor<String, MyData, Boolean> {

    public Boolean process(Map.Entry<String, MyData> entry) {
        entry.setValue(null);
        return true;
    }
}

Related to above, IMap’s executeOnEntries() method accepts predicates; you can also remove entries that match to a predicate that you provide.

Entry processors run via operation threads that are dedicated to specific partitions. Therefore, with long running entry processor executions, other partition operations such as map.put(key) on some partitions can be blocked while partition operations on other partitions might run concurrently. With this in mind, it is a good practice to make your entry processor executions as quick as possible.

Respecting Locks on Single Keys

The entry processor respects locks ONLY when its executions are performed on a single key. As explained in the above section, the entry processor has the following methods to process a single key:

<R> R executeOnKey(K key, EntryProcessor<K, V, R> entryProcessor);
<R> CompletionStage<R> submitToKey(K key, EntryProcessor<K, V, R> entryProcessor);

Therefore, if you want to to perform an entry processor execution on a single key using one of these methods and that key has a lock on it, the execution will wait until the lock on that key is removed.

Processing Backup Entries

If your code modifies the data, then you will most likely need to modify backup entries as well. This should be done to prevent divergence of map values between copies of data in the cluster (the primary and backup replicas). In most cases, this is simple. By implementing the EntryProcessor interface and providing only the process() method, the same entry processor will be applied on all copies of the map entry.

If, however, you would like to run a custom processor on backup entries, you may provide the processor by overriding the EntryProcessor#getBackupProcessor method. The method should return an instance of an EntryProcessor which will be run on backup entries exclusively. As such, it may carry some state that was derived from running the entry processor on primary replicas.

You may also return null from the EntryProcessor#getBackupProcessor method. This signifies that there is nothing to be done on the backup replicas which is most convenient when you are using the entry processor to read and not modify entries.

It is possible that an entry processor could see that a key exists though its backup processor may not find it due to an unsent backup of a previous operation, e.g., a previous put operation. In those situations, Hazelcast internally/eventually synchronizes those owner and backup partitions so you do not lose any data. When coding a backup entry processor, you should take that case into account, otherwise NullPointerException can be seen since Map.Entry.getValue() may return null.

Creating an Entry Processor

The class IncrementingEntryProcessor creates an entry processor to process the map entries. It implements the EntryProcessor interface. The process() method will be called for both primary and backup entries.

public class IncrementingEntryProcessor implements EntryProcessor<Integer, Integer, Integer> {
    public Integer process( Map.Entry<Integer, Integer> entry ) {
        Integer value = entry.getValue();
        entry.setValue( value + 1 );
        return value + 1;
    }

    @Override
    public EntryProcessor<Integer, Integer, Integer> getBackupProcessor() {
        return IncrementingEntryProcessor.this;
    }
}

An example usage is shown below:

IMap<Integer, Integer> map = hazelcastInstance.getMap( "myMap" );
for ( int i = 0; i < 100; i++ ) {
    map.put( i, i );
}
Map<Integer, Object> res = map.executeOnEntries( new IncrementingEntryProcessor() );
You should explicitly call the setValue method of Map.Entry when modifying data in the entry processor. Otherwise, the entry processor is accepted as read-only.
An entry processor instance is not thread-safe. If you are storing a partition specific state between invocations, be sure to register this in a thread-local. An entry processor instance can be used by multiple partition threads.

Entry Processor Performance Optimizations

By default the entry processor executes on a partition thread. A partition thread is responsible for handling one or more partitions. The design of entry processor assumes users have fast user code execution of the process() method. In the pathological case where the code is very heavy and executes in multi-milliseconds, this may create a bottleneck.

We have a slow user code detector which can be used to log a warning controlled by the following system properties:

  • hazelcast.slow.operation.detector.enabled (default: true)

  • hazelcast.slow.operation.detector.threshold.millis (default: 10000)

The defaults catch extremely slow operations but you should set this much lower, say to 1ms, at development time to catch entry processors that could be problematic in production. These are good candidates for our optimizations.

We have two optimizations:

  • Offloadable which moves execution off the partition thread to an executor thread

  • ReadOnly which means we can avoid taking a lock on the key

These are enabled very simply by implementing these interfaces in your EntryProcessor.

These optimizations apply to the following IMap methods only:

  • executeOnKey(Object, EntryProcessor)

  • submitToKey(Object, EntryProcessor)

  • submitToKey(Object, EntryProcessor, ExecutionCallback)

Offloadable Entry Processor

If an entry processor implements the Offloadable interface, the process() method is executed in the executor specified by the Offloadable​'s getExecutorName() method.

Offloading unblocks the partition thread allowing the user to profit from much higher throughput. The key is locked for the time span of the processing in order to not generate a write conflict.

In this case the threading looks as follows:

  1. partition thread (fetch entry & lock key)

  2. execution thread (process(entry) method)

  3. partition thread (set new value & unlock key, or just unlock key if the entry has not been modified)

The method getExecutorName() method may also return two constants defined in the Offloadable interface:

  • NO_OFFLOADING: Processing is not offloaded if the method getExecutorName() returns this constant; it is executed as if it does not implement the Offloadable interface.

  • OFFLOADABLE_EXECUTOR: Processing is offloaded to the default ExecutionService.OFFLOADABLE_EXECUTOR.

Note that if the method getExecutorName() cannot find an executor whose name matches the one called by this method, then the default executor service is used. Here is the configuration for the "default" executor:

  • XML

  • YAML

<hazelcast>
    ...
    <executor-service name="default">
        <pool-size>16</pool-size>
        <queue-capacity>0</queue-capacity>
    </executor-service>
    ...
</hazelcast>
hazelcast:
    ...
  executor-service:
    default:
      pool-size: 16
      queue-capacity: 0

An example of an Offloadable called "OffloadedInventoryEntryProcessor" would be as follows:

  • XML

  • YAML

<hazelcast>
    ...
    <executor-service name="OffloadedInventoryEntryProcessor">
        <pool-size>30</pool-size>
        <queue-capacity>0</queue-capacity>
    </executor-service>
    ...
</hazelcast>
hazelcast:
  ...
  executor-service:
    OffloadedInventoryEntryProcessor:
      pool-size: 30
      queue-capacity: 0

Remember to set the pool-size (count of executor threads per member) according to your execution needs. See the Configuring Executor Service section for the configuration details.

ReadOnly Entry Processor

By default, an entry processor does not run if the key is locked. It waits until the key has been unlocked (it applies to the executeOnKey, submitToKey methods, that were mentioned before).

If the entry processor implements the ReadOnly interface without implementing the Offloadable interface, the processing is not offloaded to an external executor. However, the entry processor does not observe if the key of the processed entry is locked, nor tries to acquire the lock since the entry processor will not do any modifications.

If the entry processor implements ReadOnly and modifies the entry, an UnsupportedOperationException is thrown.

ReadOnly and Offloadable Entry Processor

If the entry processor implements both ReadOnly and Offloadable interfaces, we observe the combination of both optimizations described above.

The process() method is executed in the executor specified by the Offloadable​'s getExecutorName() method. Also, the entry processor does not observe if the key of the processed entry is locked, nor tries to acquire the lock since the entry processor will not do any modifications.

In this case the threading looks as follows:

  1. partition thread (fetch entry)

  2. execution thread (process(entry))

In this case the EntryProcessor.getBackupProcessor() has to return null; otherwise an IllegalArgumentException exception is thrown.

If the entry processor implements ReadOnly and modifies the entry, an UnsupportedOperationException is thrown.

Putting it all together:

public class OffloadableReadOnlyEntryProcessor implements EntryProcessor<String, Employee, Object>,
        Offloadable, ReadOnly {

    @Override
    public Object process(Map.Entry<String, Employee> entry) {
        // heavy logic
        return null;
    }

    @Override
    public EntryProcessor<String, Employee, Object> getBackupProcessor() {
        // ReadOnly EntryProcessor has to return null, since it's just a read-only operation that will not be
        // executed on the backup
        return null;
    }

    @Override
    public String getExecutorName() {
        return OFFLOADABLE_EXECUTOR;
    }
}