A newer version of Hazelcast Platform is available.

View latest

Threading

For information on threading in Thread-Per-Core (TPC) environments, see Thread-Per-Core (TPC).

I/O Threading

Hazelcast uses a pool of threads for I/O. A single thread does not perform all the I/O. Instead, multiple threads perform the I/O. On each cluster member, the I/O threading is split up in three types of I/O threads:

  • I/O thread for the accept requests

  • I/O threads to read data from other members/clients

  • I/O threads to write data to other members/clients

You can configure the number of I/O threads using the hazelcast.io.thread.count system property. Its default value is 3, which is applied to each member. If the default is used, in total there are 7 I/O threads: one accept I/O thread, three read I/O threads and three write I/O threads. Each I/O thread has its own Selector instance and waits on the Selector.select if there is nothing to do.

You can also specify counts for input and output threads separately. There are hazelcast.io.input.thread.count and hazelcast.io.output.thread.count properties for this purpose. See the System Properties appendix for information about these properties and how to set them.

Hazelcast periodically scans utilization of each I/O thread and can decide to migrate a connection to a new thread if the existing thread is servicing a disproportionate number of I/O events. You can customize the scanning interval by configuring the hazelcast.io.balancer.interval.seconds system property; its default interval is 20 seconds. You can disable the balancing process by setting this property to a negative value.

In case of the read I/O thread, when sufficient bytes for a packet have been received, the Packet object is created. This Packet object is then sent to the system where it is de-multiplexed. If the Packet header signals that it is an operation/response, the Packet is handed over to the operation service (see the Operation Threading section). If the Packet is an event, it is handed over to the event service (see the Event Threading section).

Event Threading

Hazelcast uses a shared event system to deal with components that rely on events, such as topic, collections, listeners and Near Cache.

Each cluster member has an array of event threads and each thread has its own work queue. When an event is produced, either locally or remotely, an event thread is selected (depending on if there is a message ordering) and the event is placed in the work queue for that event thread.

You can set the following properties to alter the system’s behavior:

  • hazelcast.event.thread.count: Number of event-threads in this array. Its default value is 5.

  • hazelcast.event.queue.capacity: Capacity of the work queue. Its default value is 1000000.

  • hazelcast.event.queue.timeout.millis: Timeout for placing an item on the work queue in milliseconds. Its default value is 250 milliseconds.

If you process a lot of events and have many cores, changing the value of hazelcast.event.thread.count property to a higher value is good practice. This way, more events can be processed in parallel.

Multiple components share the same event queues. If there are two topics, say A and B, for certain messages they may share the same queue(s) and hence the same event thread. If there are a lot of pending messages produced by A, then B needs to wait. Also, when processing a message from A takes a lot of time and the event thread is used for that, B suffers from this. That is why it is better to offload processing to a dedicated thread (pool) so that systems are better isolated.

If the events are produced at a higher rate than they are consumed, the queue grows in size. To prevent overloading the system and running into an OutOfMemoryException, the queue is given a capacity of one million items. When the maximum capacity is reached, the items are dropped. This means that the event system is a 'best effort' system. There is no guarantee that you are going to get an event. Topic A might have a lot of pending messages and therefore B cannot receive messages because the queue has no capacity and messages for B are dropped.

IExecutor Threading

Executor threading is straight forward. When a task is received to be executed on Executor E, then E will have its own ThreadPoolExecutor instance and the work is placed in the work queue of this executor. Thus, Executors are fully isolated, but still share the same underlying hardware - most importantly the CPUs.

You can configure the IExecutor using the ExecutorConfig (programmatic configuration) or using <executor> (declarative configuration). See also the Configuring Executor Service section.

Operation Threading

The following are the operation types:

  • operations that are aware of a certain partition; for example., IMap.get(key)

  • operations that are not partition aware; for example, IExecutorService.executeOnMember(command, member)

Each of these operation types has a different threading model explained in the following sections.

Partition-aware Operations

To execute partition-aware operations, an array of operation threads is created. The default value of this array’s size is the number of cores and it has a minimum value of 2. This value can be changed using the hazelcast.operation.thread.count property.

Each operation thread has its own work queue and it consumes messages from this work queue. If a partition-aware operation needs to be scheduled, the right thread is found using the formula below.

threadIndex = partitionId % partition thread-count

After the threadIndex is determined, the operation is put in the work queue of that operation thread. This means the following:

  • A single operation thread executes operations for multiple partitions; if there are 271 partitions and 10 partition threads, then roughly every operation thread executes operations for 27 partitions.

  • Each partition belongs to only one operation thread. All operations for a partition are always handled by exactly the same operation thread.

  • Concurrency control is not needed to deal with partition-aware operations because once a partition-aware operation is put in the work queue of a partition-aware operation thread, only one thread is able to touch that partition.

Because of this threading strategy, there are two forms of false sharing you need to be aware of:

  • False sharing of the partition - two completely independent data structures share the same partition. For example, if there is a map employees and a map orders, the method employees.get("peter") running on partition 25 may be blocked by the method orders.get(1234) also running on partition 25. If independent data structures share the same partition, a slow operation on one data structure can slow down the other data structures.

  • False sharing of the partition-aware operation thread - each operation thread is responsible for executing operations on a number of partitions. For example, thread 1 could be responsible for partitions 0, 10, 20 and so on, and thread-2 could be responsible for partitions 1, 11, 21 and so on. If an operation for partition 1 takes a lot of time, it blocks the execution of an operation for partition 11 because both of them are mapped to the same operation thread.

You need to be careful with long-running operations because you could starve operations of a thread. As a general rule, the partition thread should be released as soon as possible because operations are not designed as long-running operations. That is why, for example, it is very dangerous to execute a long-running operation using AtomicReference.alter() or an IMap.executeOnKey(), because these operations block other operations to be executed.

Currently, there is no support for work stealing. Different partitions that map to the same thread may need to wait until one of the partitions is finished, even though there are other free partition-aware operation threads available.

Example:

Take a cluster with three members. Two members have 90 primary partitions and one member has 91 primary partitions. Let’s say you have one CPU and four cores per CPU. By default, four operation threads will be allocated to serve 90 or 91 partitions.

Non-Partition-aware Operations

To execute operations that are not partition-aware - for example, IExecutorService.executeOnMember(command, member) - generic operation threads are used. When the Hazelcast instance is started, an array of operation threads is created. The size of this array has a default value of the number of cores divided by two with a minimum value of 2. It can be changed using the hazelcast.operation.generic.thread.count property.

A non-partition-aware operation thread does not execute an operation for a specific partition. Only partition-aware operation threads execute partition-aware operations.

Unlike the partition-aware operation threads, all the generic operation threads share the same work queue: genericWorkQueue.

If a non-partition-aware operation needs to be executed, it is placed in that work queue and any generic operation thread can execute it. The big advantage is that you automatically have work balancing since any generic operation thread is allowed to pick up work from this queue.

The disadvantage is that this shared queue can be a point of contention. You may not see this contention in production since performance is dominated by I/O and the system does not run many non-partition-aware operations.

Priority Operations

In some cases, the system needs to run operations with a higher priority; for example, an important system operation. To support priority operations, Hazelcast has the following features:

  • For partition-aware operations: Each partition thread has its own work queue and it also has a priority work queue. The partition thread always checks the priority queue before it processes work from its normal work queue.

  • For non-partition-aware operations: Next to the genericWorkQueue, there is also a genericPriorityWorkQueue. When a priority operation needs to be run, it is put in the genericPriorityWorkQueue. Like the partition-aware operation threads, a generic operation thread first checks the genericPriorityWorkQueue for work.

Since a worker thread blocks the normal work queue (either partition specific or generic), a priority operation may not be picked up because it is not put in the queue where it is blocked. Hazelcast always sends a 'kick the worker' operation that only triggers the worker to wake up and check the priority queue.

Operation-response and Invocation-future

When an Operation is invoked, a Future is returned. See the example code below.

GetOperation operation = new GetOperation( mapName, key );
Future future = operationService.invoke( operation );
future.get();

The calling side blocks the thread for a reply. In this case, GetOperation is set in the work queue for the partition of key, where it eventually is executed. Upon execution, a response is returned and placed on the genericWorkQueue where it is executed by a "generic operation thread". This thread signals the future and notifies the blocked thread that a response is available. Hazelcast has a plan of exposing this future to the outside world, and we will provide the ability to register a completion listener so you can perform asynchronous calls.

Local Calls

When a local partition-aware call is done, an operation is made and handed over to the work queue of the correct partition operation thread, and a future is returned. When the calling thread calls get on that future, it acquires a lock and waits for the result to become available. When a response is calculated, the future is looked up and the waiting thread is notified.

In the future, this will be optimized to reduce the amount of expensive systems calls, such as lock.acquire()/notify() and the expensive interaction with the operation-queue. Probably, we will add support for a caller-runs mode, so that an operation is directly run on the calling thread.

CPU Thread Affinity

Hazelcast offers configuring CPU threads so that you have a lot better control on the latency and a better throughput. This configuration provides you with the CPU thread affinity, where certain threads can have affinity for particular CPUs.

The following affinity configurations are available for a member:

-Dhazelcast.io.input.thread.affinity=1-3
-Dhazelcast.io.output.thread.affinity=3-5
-Dhazelcast.operation.thread.affinity=7-10,13
-Dhazelcast.operation.response.thread.affinity=15,16

The following affinity configurations are available for a client:

-Dhazelcast.client.io.input.thread.affinity=1-4
-Dhazelcast.client.io.output.thread.affinity=5-8
-Dhazelcast.client.response.thread.affinity=7-9

You can set the CPU thread affinity properties shown above only on the command line.

Let’s have a look at how we define the values for the above configuration properties:

  • Individual CPUs, for example 1,2,3: This means there are going to be three threads. The first thread runs on CPU 1, the second thread on CPU 2, and so on.

  • CPU ranges, for example 1-3: Shortcut syntax for 1,2,3.

  • Group, for example [1-3]: This configures three threads and each of these threads can run on CPU 1, 2 and 3.

  • Group with thread count, for example [1-3]:2: This configures two threads and each of these two threads can run on CPU 1, 2 and 3.

You can also combine those, for example 1,2,[5-7],[10,12,16]:2.

Note that, the syntax for CPU thread affinity shown above not only determines the mapping of CPUs to threads, it also determines the thread count. If you use CPU thread affinity - for example, hazelcast.io.input.thread.affinity - then hazelcast.io.input.thread.count is ignored. See Threading Model for more information about specifying explicit thread counts.

If you don’t configure affinity for a category of threads, it means they can run on any CPU.

Let’s look at an example. Assuming you have the numactl utility, run the following command on your machine to see the mapping between the NUMA nodes and threads:

numactl --hardware

An example output is shown below:

available: 2 nodes (0-1)
node 0 cpus: 0 1 2 3 4 5 6 7 8 9 20 21 22 23 24 25 26 27 28 29
node 0 size: 393090 MB
node 0 free: 372729 MB
node 1 cpus: 10 11 12 13 14 15 16 17 18 19 30 31 32 33 34 35 36 37 38 39
node 1 size: 393216 MB
node 1 free: 343296 MB
node distances:
node   0   1
  0:  10  21
  1:  21  10

If you want to configure 20 threads on NUMA node 0 and 20 threads on NUMA node 1, and confine the threads to these NUMA nodes, you can use the following configuration:

-Dhazelcast.operation.thread.affinity=[0-9,20-29],[10-19,30-39]

Refer to Non-uniform memory access for information about NUMA nodes.

SlowOperationDetector

The SlowOperationDetector monitors the operation threads and collects information about all slow operations. An Operation is a task executed by a generic or partition thread (see Operation Threading). An operation is considered as slow when it takes more computation time than the configured threshold.

The SlowOperationDetector stores the fully qualified classname of the operation and its stacktrace as well as operation details, start time and duration of each slow invocation. All collected data is available in the Management Center.

The SlowOperationDetector is configured using the following system properties.

  • hazelcast.slow.operation.detector.enabled

  • hazelcast.slow.operation.detector.log.purge.interval.seconds

  • hazelcast.slow.operation.detector.log.retention.seconds

  • hazelcast.slow.operation.detector.stacktrace.logging.enabled

  • hazelcast.slow.operation.detector.threshold.millis

See the System Properties appendix for explanations of these properties.

Logging of Slow Operations

The detected slow operations are logged as warnings in the Hazelcast log files:

WARN 2015-05-07 11:05:30,890 SlowOperationDetector: [127.0.0.1]:5701
  Slow operation detected: com.hazelcast.map.impl.operation.PutOperation
  Hint: You can enable the logging of stacktraces with the following config
  property: hazelcast.slow.operation.detector.stacktrace.logging.enabled
WARN 2015-05-07 11:05:30,891 SlowOperationDetector: [127.0.0.1]:5701
  Slow operation detected: com.hazelcast.map.impl.operation.PutOperation
  (2 invocations)
WARN 2015-05-07 11:05:30,892 SlowOperationDetector: [127.0.0.1]:5701
  Slow operation detected: com.hazelcast.map.impl.operation.PutOperation
  (3 invocations)

Stacktraces are always reported to the Management Center, but by default they are not printed to keep the log size small. If logging of stacktraces is enabled, the full stacktrace is printed every 100 invocations. All other invocations print a shortened version.

Purging of Slow Operation Logs

Since a Hazelcast cluster can run for a very long time, Hazelcast purges the slow operation logs periodically to prevent an OOME. You can configure the purge interval and the retention time for each invocation.

The purging removes each invocation whose retention time is exceeded. When all invocations are purged from a slow operation log, the log is deleted.

Setting Response Thread

You can set the response thread for internal operations both on the members and clients. By setting the backoff mode on and depending on the use case, you can get a 5-10% performance improvement. However, this increases the CPU utilization. To enable backoff mode please set the following property for Hazelcast cluster members:

-Dhazelcast.operation.responsequeue.idlestrategy=backoff

For Hazelcast clients, use the following property to enable backoff:

-Dhazelcast.client.responsequeue.idlestrategy=backoff