A newer version of Hazelcast Platform is available.

View latest

Java Executor Service

One of the coolest features of Java is the Executor framework, which allows you to asynchronously execute your tasks (logical units of work), such as database queries, complex calculations and image rendering.

The default implementation of this framework (ThreadPoolExecutor) is designed to run within a single JVM (cluster member). In distributed systems, this implementation is not desired since you may want a task submitted in one JVM and processed in another one. Hazelcast offers IExecutorService for you to use in distributed environments. It implements java.util.concurrent.ExecutorService to serve the applications requiring computational and data processing power.

If you want to process batch or real-time streaming data, you may want to use a data pipeline.

With IExecutorService, you can execute tasks asynchronously and perform other useful tasks. If your task execution takes longer than expected, you can cancel the task execution. Tasks should be Serializable since they are distributed.

In the Java Executor framework, you implement tasks two ways: Callable or Runnable.

  • Callable: If you need to return a value and submit it to Executor, implement the task as java.util.concurrent.Callable.

  • Runnable: If you do not need to return a value, implement the task as java.util.concurrent.Runnable.

Note that, the distributed executor service (IExecutorService) is intended to run processing where the data is hosted: on the server members. In general, you cannot run a Java Runnable or Callable on the clients as the clients may not be Java. Also, the clients do not host any data, so they would have to fetch what data they need from the servers potentially. If you want something to run on all or some clients connected to your cluster, you could implement this using the publish/subscribe mechanism; a payload could be sent to an ITopic with the necessary execution parameters, and clients listening can act on the message.

Implementing a Callable Task

In Hazelcast, when you implement a task as java.util.concurrent.Callable (a task that returns a value), you implement Callable and Serializable.

Below is an example of a Callable task. SumTask prints out map keys and returns the summed map values.

public class SumTask
        implements Callable<Integer>, Serializable, HazelcastInstanceAware {

    private transient HazelcastInstance hazelcastInstance;

    public void setHazelcastInstance( HazelcastInstance hazelcastInstance ) {
        this.hazelcastInstance = hazelcastInstance;
    }

    public Integer call() throws Exception {
        IMap<String, Integer> map = hazelcastInstance.getMap( "map" );
        int result = 0;
        for ( String key : map.localKeySet() ) {
            System.out.println( "Calculating for key: " + key );
            result += map.get( key );
        }
        System.out.println( "Local Result: " + result );
        return result;
    }
}

Another example is the Echo callable below. In its call() method, it returns the local member and the input passed in. Remember that instance.getCluster().getLocalMember() returns the local member and toString() returns the member’s address (IP + port) in String form, just to see which member actually executed the code for our example. Of course, the call() method can do and return anything you like.

public class Echo implements Callable<String>, Serializable, HazelcastInstanceAware {
    String input = null;

    private transient HazelcastInstance hazelcastInstance;

    public Echo() {
    }

    public void setHazelcastInstance( HazelcastInstance hazelcastInstance ) {
        this.hazelcastInstance = hazelcastInstance;
    }

    public Echo(String input) {
        this.input = input;
    }

    public String call() {
        return hazelcastInstance.getCluster().getLocalMember().toString() + ":" + input;
    }
}

Executing a Callable Task

To execute a callable task:

  • retrieve the Executor from HazelcastInstance

  • submit a task which returns a Future

  • after executing the task, you do not have to wait for the execution to complete, you can process other things

  • when ready, use the Future object to retrieve the result as shown in the code example below.

Below, the Echo task is executed.

public class MasterMember {

    public static void main( String[] args ) throws Exception {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        IExecutorService executorService = instance.getExecutorService( "executorService" );
        Future<String> future = executorService.submit( new Echo( "myinput") );
        //while it is executing, do some useful stuff
        //when ready, get the result of your execution
        String result = future.get();
    }
}

Please note that the Echo callable in the above example also implements a Serializable interface, since it may be sent to another member to be processed.

When a task is deserialized, HazelcastInstance needs to be accessed. To do this, the task should implement HazelcastInstanceAware interface. See the HazelcastInstanceAware Interface section for more information.

Implementing a Runnable Task

In Hazelcast, when you implement a task as java.util.concurrent.runnable (a task that does not return a value), you implement Runnable and Serializable.

Below is Runnable example code. It is a task that waits for some time and echoes a message.

public class EchoTask implements Runnable, Serializable {

    private final String msg;

    public EchoTask( String msg ) {
        this.msg = msg;
    }

    @Override
    public void run() {
        try {
            Thread.sleep( 5000 );
        } catch ( InterruptedException e ) {
        }
        System.out.println( "echo:" + msg );
    }
}

Executing a Runnable Task

To execute the runnable task:

  • retrieve the Executor from HazelcastInstance

  • submit the tasks to the Executor.

Now let’s write a class that submits and executes these echo messages. Executor is retrieved from HazelcastInstance and 1000 echo tasks are submitted.

public class RunnableMasterMember {

    public static void main( String[] args ) throws Exception {
        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
        IExecutorService executor = hazelcastInstance.getExecutorService( "exec" );
        for ( int k = 1; k <= 1000; k++ ) {
            Thread.sleep( 1000 );
            System.out.println( "Producing echo task: " + k );
            executor.execute( new EchoTask( String.valueOf( k ) ) );
        }
        System.out.println( "EchoTaskMain finished!" );
    }
}

Scaling The Executor Service

You can scale the Executor service both vertically (scale up) and horizontally (scale out).

To scale up, you should improve the processing capacity of the cluster member (JVM). You can do this by increasing the pool-size property mentioned in Configuring Executor Service (i.e., increasing the thread count). However, please be aware of your member’s capacity. If you think it cannot handle such an additional load caused by increasing the thread count, you may want to consider improving the member’s resources (CPU, memory, etc.). As an example, set the pool-size to 5 and run the above MasterMember. You will see that EchoTask is run as soon as it is produced.

To scale out, add more members instead of increasing only one member’s capacity. In reality, you may want to expand your cluster by adding more physical or virtual machines. For example, in the EchoTask example in the Runnable section, you can create another Hazelcast instance. That instance automatically gets involved in the executions started in MasterMember and start processing.

Executing Code in the Cluster

The distributed executor service is a distributed implementation of java.util.concurrent.ExecutorService. It allows you to execute your code in the cluster. In this section, the code examples are based on the Echo class above (please note that the Echo class is Serializable). The code examples show how Hazelcast can execute your code (Runnable, Callable):

  • echoOnTheMember: On a specific cluster member you choose with the IExecutorService submitToMember method.

  • echoOnTheMemberOwningTheKey: On the member owning the key you choose with the IExecutorService submitToKeyOwner method.

  • echoOnSomewhere: On the member Hazelcast picks with the IExecutorService submit method.

  • echoOnMembers: On all or a subset of the cluster members with the IExecutorService submitToMembers method.

public void echoOnTheMember( String input, Member member ) throws Exception {
    Callable<String> task = new Echo( input );
    HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
    IExecutorService executorService =
      hazelcastInstance.getExecutorService( "default" );

    Future<String> future = executorService.submitToMember( task, member );
    String echoResult = future.get();
}
public void echoOnTheMemberOwningTheKey( String input, Object key ) throws Exception {
    Callable<String> task = new Echo( input );
    HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
    IExecutorService executorService =
      hazelcastInstance.getExecutorService( "default" );

    Future<String> future = executorService.submitToKeyOwner( task, key );
    String echoResult = future.get();
}
public void echoOnSomewhere( String input ) throws Exception {
    HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
    IExecutorService executorService =
      hazelcastInstance.getExecutorService( "default" );

    Future<String> future = executorService.submit( new Echo( input ) );
    String echoResult = future.get();
}
public void echoOnMembers( String input, Set<Member> members ) throws Exception {
    HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
    IExecutorService executorService =
      hazelcastInstance.getExecutorService( "default" );

    Map<Member, Future<String>> futures = executorService
      .submitToMembers( new Echo( input ), members );

    for ( Future<String> future : futures.values() ) {
        String echoResult = future.get();
        // ...
    }
}
You can obtain the set of cluster members via HazelcastInstance.getCluster().getMembers() call.

Canceling an Executing Task

A task in the code that you execute in a cluster might take longer than expected. If you cannot stop/cancel that task, it keeps eating your resources.

To cancel a task, you can use the standard Java executor framework’s cancel() API. This framework encourages us to code and design for cancellations, a highly ignored part of software development.

Example Task to Cancel

The Fibonacci callable class below calculates the Fibonacci number for a given number. In the calculate method, we check if the current thread is interrupted so that the code can respond to cancellations once the execution is started.

    int input = 0;

    public FibonacciCallable( int input ) {
        this.input = input;
    }

    public Long call() {
        return calculate( input );
    }

    private long calculate( int n ) {
        if ( Thread.currentThread().isInterrupted() ) {
            return 0;
        }
        if ( n <= 1 ) {
            return n;
        } else {
            return calculate( n - 1 ) + calculate( n - 2 );
        }
    }

Example Method to Execute and Cancel the Task

The fib() method below submits the Fibonacci calculation task above for number 'n' and waits a maximum of 3 seconds for the result. If the execution does not complete in three seconds, the future.get() method throws a TimeoutException and upon catching it, we cancel the execution, saving some CPU cycles.

    long fib( int n ) throws Exception {
        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
        IExecutorService es = hazelcastInstance.getExecutorService("es");
        Future<Long> future = es.submit( new FibonacciCallable( n ) );
        try {
            long result = future.get( 3, TimeUnit.SECONDS );
            System.out.println(result);
        } catch ( TimeoutException e ) {
            future.cancel( true );
        }
        return -1;
    }

fib(20) probably takes less than 3 seconds. However, fib(50) takes much longer. (This is not an example for writing better Fibonacci calculation code, but for showing how to cancel a running execution that takes too long.) The method future.cancel(false) can only cancel execution before it is running (executing), but future.cancel(true) can interrupt running executions provided that your code is able to handle the interruption. If you are willing to cancel an already running task, then your task should be designed to handle interruptions. If the calculate (int n) method did not have the (Thread.currentThread().isInterrupted()) line, then you would not be able to cancel the execution after it is started.

Callback When Task Completes

You can use the ExecutionCallback offered by Hazelcast to asynchronously be notified when the execution is done. To be notified when your task completes without an error, implement the onResponse method. To be notified when your task completes with an error, implement the onFailure method.

Example Task to Callback

Let’s use the Fibonacci series to explain this. The example code below is the calculation that is executed. Note that it is Callable and Serializable.

public class Fibonacci2 implements Callable<Long>, Serializable {

    private final int input;

    public Fibonacci2(int input) {
        this.input = input;
    }

    public Long call() {
        return calculate(input);
    }

    private long calculate(int n) {
        if (Thread.currentThread().isInterrupted()) {
            System.out.println("FibonacciCallable is interrupted");
            throw new RuntimeException("FibonacciCallable is interrupted");
        }
        if (n <= 1) {
            return n;
        } else {
            return calculate(n - 1) + calculate(n - 2);
        }
    }
}

Example Method to Callback the Task

The example code below submits the Fibonacci calculation to ExecutionCallback and prints the result asynchronously. ExecutionCallback has the methods onResponse and onFailure. In this example code, onResponse is called upon a valid response and prints the calculation result, whereas onFailure is called upon a failure and prints the stacktrace.

public class MasterMemberCallback {

    public static void main(String[] args) {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IExecutorService executor = hz.getExecutorService("executor");

        ExecutionCallback<Long> executionCallback = new ExecutionCallback<Long>() {
            public void onFailure(Throwable t) {
                t.printStackTrace();
            }

            public void onResponse(Long response) {
                System.out.println("Result: " + response);
            }
        };

        executor.submit(new FibonacciCallable(10), executionCallback);
        System.out.println("Fibonacci task submitted");
    }
}

Selecting Members for Task Execution

As previously mentioned, it is possible to indicate where in the Hazelcast cluster the Runnable or Callable is executed. Usually you execute these in the cluster based on the location of a key or a set of keys, or you allow Hazelcast to select a member.

If you want more control over where your code runs, use the MemberSelector interface. For example, you may want certain tasks to run only on certain members, or you may wish to implement some form of custom load balancing regime. The MemberSelector is an interface that you can implement and then provide to the IExecutorService when you submit or execute.

The select(Member) method is called for every available member in the cluster. Implement this method to decide if the member is going to be used or not.

In a simple example shown below, we select the cluster members based on the presence of an attribute.

public class MyMemberSelector implements MemberSelector {
    public boolean select(Member member) {
        return Boolean.TRUE.equals(member.getBooleanAttribute("my.special.executor"));
    }
}

You can use MemberSelector instances provided by the com.hazelcast.cluster.memberselector.MemberSelectors class. For example, you can select a lite member for running a task using com.hazelcast.cluster.memberselector.MemberSelectors#LITE_MEMBER_SELECTOR.

Configuring Executor Service

The following are example configurations for executor service.

Declarative Configuration:

  • XML

  • YAML

<hazelcast>
    ...
    <executor-service name="exec">
        <pool-size>1</pool-size>
        <queue-capacity>10</queue-capacity>
        <statistics-enabled>true</statistics-enabled>
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </executor-service>
    ...
</hazelcast>
hazelcast:
  ..
  executor-service:
    exec:
      pool-size: 1
      queue-capacity: 10
      statistics-enabled: true
      split-brain-protection-ref: splitbrainprotection-name

Programmatic Configuration:

        Config config = new Config();
        ExecutorConfig executorConfig = config.getExecutorConfig("exec");
        executorConfig.setPoolSize( 1 ).setQueueCapacity( 10 )
                .setStatisticsEnabled( true )
                .setSplitBrainProtectionName( "splitbrainprotectionname" );

Executor service configuration has the following elements:

  • pool-size: The number of executor threads per Member for the Executor. By default, Executor is configured to have 16 threads in the pool. You can change that with this element.

  • queue-capacity: Executor’s task queue capacity; the number of tasks this queue can hold.

  • statistics-enabled: Specifies whether the statistics gathering is enabled for your Executor Service. If set to false, you cannot collect statistics in your implementation (using getLocalExecutorStats()) and also Hazelcast Management Center will not show them. Its default value is true.

  • split-brain-protection-ref: Name of the split-brain protection configuration that you want this Executor Service to use. See the Split-Brain Protection for IExecutorService section.

Split-Brain Protection for IExecutorService

IExecutorService can be configured to check for a minimum number of available members before applying its operations (see the Split-Brain Protection section). This is a check to avoid performing successful queue operations on all parts of a cluster during a network partition.

The following is a list of methods, grouped by the operations, that support split-brain protection checks:

  • WRITE, READ_WRITE:

    • execute

    • executeOnAllMembers

    • executeOnKeyOwner

    • executeOnMember

    • executeOnMembers

    • shutdown

    • shutdownNow

    • submit

    • submitToAllMembers

    • submitToKeyOwner

    • submitToMember

    • submitToMembers

Configuring Split-Brain Protection

Split-brain protection for Executor Service can be configured programmatically using the method setSplitBrainProtectionName(), or declaratively using the element split-brain-protection-ref. Following is an example declarative configuration:

  • XML

  • YAML

<hazelcast>
    ...
    <executor-service name="default">
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </executor-service>
    ...
</hazelcast>
hazelcast:
  ..
  executor-service:
    default:
      split-brain-protection-ref: splitbrainprotection-name

The value of split-brain-protection-ref should be the split-brain protection configuration name which you configured under the split-brain-protection element as explained in the Split-Brain Protection section.