Scheduled Executor Service

Hazelcast’s scheduled executor service (IScheduledExecutorService) is a data structure which implements java.util.concurrent.ScheduledExecutorService, partially. By partially, we mean the behavior difference in scheduling a task at a fixed rate (scheduleAtFixedRate()). Hazelcast’s behavior guarantees that a task is not executed by multiple threads concurrently: a scheduled execution is skipped, instead of postponing, if another thread is still running the same task.

On top of the Vanilla Scheduling API, IScheduledExecutorService allows additional methods such as the following:

  • scheduleOnMember: On a specific cluster member.

  • scheduleOnKeyOwner: On the partition owning that key.

  • scheduleOnAllMembers: On all cluster members.

  • scheduleOnMembers: On all given members.

See the IScheduledExecutorService Javadoc for its API details.

There are two different modes of durability for the service:

  1. Upon partition specific scheduling, the future task is stored both in the primary partition and also in its N backups, N being the <durability> property in the configuration. More specifically, there are always one or more backups to take ownership of the task in the event of a lost member. If a member is lost, the task is re-scheduled on the backup (new primary) member, which might induce further delays on the subsequent executions of the task. For example, if we schedule a task to run in 10 seconds from now, schedule(new ExampleTask(), 10, TimeUnit.SECONDS); and after 5 seconds the owner member goes down (before the execution takes place), then the backup owner re-schedules the task in 10 seconds from now. Therefore, from the user’s perspective waiting on the result, this will be available in 10 + 5 = 15 seconds rather than 10 seconds as it is anticipated originally. If atFixedRate was used, then only the initial delay is affected in the above scenario, all subsequent executions should adhere to the given period parameter.

  2. Upon member specific scheduling, the future task is only stored in the member itself, which means that in the event of a lost member, the task is lost as well.

To accomplish the described durability, all tasks provide a unique identity/name before the scheduling takes place. The name allows the service to reach the scheduled task even after the caller (client or member) goes down and also allows to prevent duplicate tasks. The name of the task can be user-defined if it needs to be, by implementing the com.hazelcast.scheduledexecutor.NamedTask interface (plain wrapper util is available here: com.hazelcast.scheduledexecutor.TaskUtils.named(java.lang.String, java.lang.Runnable)). If the task does not provide a name in its implementation, the service provides a random UUID for it, internally.

Upon scheduling, the service returns an IScheduledFuture, which on top of the java.util.concurrent.ScheduledFuture functionality, provides an API to get the resource handler of the task ScheduledTaskHandler and also the runtime statistics of the task.

Futures associated with a scheduled task, in order to be aware of lost partitions and/or members, act as listeners on the local member/client. Therefore, they are always strongly referenced, on the member/client side. In order to clean up their resources, once completed, you can use the method dispose(). This method also cancels further executions of the task if scheduled at a fixed rate. See the IScheduledFuture Javadoc for its API details.

The task handler is a descriptor class holding information for the scheduled future, which is used to pinpoint the actual task in the cluster. It contains the name of the task, the owner (member or partition) and the scheduler name.

The handler is always available after scheduling and can be stored in a plain string format com.hazelcast.scheduledexecutor.ScheduledTaskHandler.toUrn() and re-constructed back from that String com.hazelcast.scheduledexecutor.ScheduledTaskHandler.of(). If the handler is lost, you can still find a task under a given scheduler by using the Scheduler’s com.hazelcast.scheduledexecutor.IScheduledExecutorService.getAllScheduledFutures().

Last but not least, similar to executor service, the scheduled executor service allows Stateful tasks to be scheduled. Stateful tasks, are tasks that require any kind of state during their runtime, which must also be durable along with the task in the event of a lost partition.

Stateful tasks can be created by implementing the com.hazelcast.scheduledexecutor.StatefulTask interface, providing implementation details for saving the state and loading it back. If a partition is lost, then the re-scheduled task loads the previously saved state before its execution.

As with the tasks, Objects stored in the state Map need to be Hazelcast serializable.

Configuring Scheduled Executor Service

This section presents example configurations for scheduled executor service along with the descriptions of its configuration elements and attributes.

Declarative Configuration:

  • XML

  • YAML

<hazelcast>
    ...
    <scheduled-executor-service name="myScheduledExecSvc">
        <statistics-enabled>true</statistics-enabled>
        <pool-size>16</pool-size>
        <durability>1</durability>
        <capacity>100</capacity>
        <capacity-policy>PER_NODE</capacity-policy>
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
        <merge-policy batch-size="100">PutIfAbsentMergePolicy</merge-policy>
    </scheduled-executor-service>
    ...
</hazelcast>
hazelcast:
  ...
  scheduled-executor-service:
    myScheduledExecSvc:
      statistics-enabled: true
      pool-size: 16
      durability: 1
      capacity: 100
      capacity-policy: PER_NODE
      split-brain-protection-ref: splitbrainprotection-name
      merge-policy:
        batch-size: 100
        class-name: PutIfAbsentMergePolicy

Programmatic Configuration:

        Config config = new Config();
        config.getScheduledExecutorConfig( "myScheduledExecSvc" )
              .setPoolSize ( 16 )
              .setCapacityPolicy( ScheduledExecutorConfig.CapacityPolicy.PER_PARTITION )
              .setCapacity( 100 )
              .setDurability( 1 )
              .setMergePolicyConfig( new MergePolicyConfig("com.hazelcast.spi.merge.PassThroughMergePolicy", 110) )
              .setSplitBrainProtectionName( "splitbrainprotectionname" );

        HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance(config);
        IScheduledExecutorService myScheduledExecSvc = hazelcast.getScheduledExecutorService("myScheduledExecSvc");

The following are the descriptions of each configuration element and attribute:

  • name: Name of the scheduled executor.

  • statistics-enabled: Specifies whether the statistics gathering is enabled. If set to false, you cannot collect statistics.

  • pool-size: Number of executor threads per member for the executor. The default value is 16.

  • capacity: Maximum number of tasks that a scheduler can have per partition or per member. Attempt to schedule more results in RejectedExecutionException. To free up the capacity, tasks should get disposed. If set to 0, it means there is no capacity limitation.

  • capacity-policy: The active policy for the capacity setting; defines how the defined capacity value is applied - either per member or per partition. Available options are PER_PARTITION and PER_NODE.

    • PER_NODE: Capacity policy that counts tasks for each Hazelcast member. It rejects new tasks when the capacity value is reached.

    • PER_PARTITION: Capacity policy that counts tasks for each partition. It rejects new tasks when the capacity value is reached. Storage size depends on the partition count in a Hazelcast member. This policy option should not be used often - avoid using it with a small cluster: if the cluster is small, it hosts more partitions, and therefore tasks, than that of a larger cluster. This policy has no effect when scheduling is done using the OnMember APIs, e.g., IScheduledExecutorService#scheduleOnMember(Runnable, Member, long, TimeUnit).

  • durability: Durability of the executor.

  • split-brain-protection-ref: Name of the split-brain protection configuration that you want this Scheduled Executor Service to use. See Split-Brain Protection for IScheduled Executor Service.

  • merge-policy: The default policy is PutIfAbsentMergePolicy with a batch size of 100. This is the policy used when merging entries from sub-clusters (after split-brain recovery). See Merge Policies.

Examples

Scheduling a callable that computes the cluster size in 10 seconds from now:

static class DelayedClusterSizeTask implements Callable<Integer>, HazelcastInstanceAware, Serializable {

    private transient HazelcastInstance instance;

    @Override
    public Integer call()
            throws Exception {
        return instance.getCluster().getMembers().size();
    }

    @Override
    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
        this.instance = hazelcastInstance;
    }
}

HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
IScheduledExecutorService executorService = hazelcast.getScheduledExecutorService("myScheduler");
IScheduledFuture<Integer> future = executorService.schedule(
        new DelayedClusterSizeTask(), 10, TimeUnit.SECONDS);

int membersCount = future.get(); // Block until we get the result
ScheduledTaskStatistics stats = future.getStats();
future.dispose(); // Always dispose futures that are not in use any more, to release resources
long totalTaskRuns = stats.getTotalRuns(); // = 1

Split-Brain Protection for IScheduled Executor Service

IScheduledExecutorService can be configured to check for a minimum number of available members before applying its operations (see the Split-Brain Protection section). This is a check to avoid performing successful queue operations on all parts of a cluster during a network partition.

The following is a list of methods, grouped by the operations, that support split-brain protection checks:

  • WRITE, READ_WRITE:

    • schedule

    • scheduleAtFixedRate

    • scheduleOnAllMembers

    • scheduleOnAllMembersAtFixedRate

    • scheduleOnKeyOwner

    • scheduleOnKeyOwnerAtFixedRate

    • scheduleOnMember

    • scheduleOnMemberAtFixedRate

    • scheduleOnMembers

    • scheduleOnMembersAtFixedRate

    • shutdown

  • READ, READ_WRITE:

    • getAllScheduledFutures

Configuring Split-Brain Protection

Split-brain protection for Scheduled Executor Service can be configured programmatically using the method setSplitBrainProtectionName(), or declaratively using the element split-brain-protection-ref. Following is an example declarative configuration:

  • XML

  • YAML

<hazelcast>
    ...
    <scheduled-executor-service name="myScheduledExecSvc">
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </scheduled-executor-service>
    ...
</hazelcast>
hazelcast:
  ...
  scheduled-executor-service:
    myScheduledExecSvc:
      split-brain-protection-ref: splitbrainprotection-name

The value of split-brain-protection-ref should be the split-brain protection configuration name which you configured under the split-brain-protection element as explained in the Split-Brain Protection section.