A newer version of Hazelcast Platform is available.

View latest

Best Practices

Basic Performance Recommendations

  • 8 cores per Hazelcast server instance

  • Minimum of 8 GB RAM per Hazelcast member (if not using the High-Density Memory Store)

  • Dedicated NIC per Hazelcast member

  • Linux—any distribution

  • All members should run within the same subnet

  • All members should be attached to the same network switch

Homogeneous Hardware Resources

One of the most effective optimization strategies for Hazelcast is to ensure that Hazelcast services are allocated their own dedicated machine resources. Using dedicated, properly sized hardware (or virtual hardware) ensures that Hazelcast members have ample CPU, memory, and network resources without competing with other processes or services.

Hazelcast distributes load evenly across all its members and assumes that the resources available to each of its members are homogeneous. In a cluster with a mix of more and less powerful machines, the weaker members cause bottlenecks, leaving the stronger members underutilized. For predictable performance, it is best to use equivalent hardware for all Hazelcast members.

Using Single Member per Machine

A Hazelcast member assumes it is alone on a machine, so we recommend not running multiple Hazelcast members on a machine. Having multiple members on a single machine most likely gives a worse performance compared to running a single member, since there will be more context switching, less batching, etc. So unless it is proven that running multiple members per machine does give a better performance/behavior in your particular setup, it is best to run a single member per machine.

Using Operation Threads Efficiently

By default, Hazelcast uses the machine’s core count to determine the number of operation threads. Creating more operation threads than this core count is highly unlikely leads to an improved performance since there will be more context switching, more thread notification, etc.

Especially if you have a system that does simple operations like put and get, it is better to use a lower thread count than the number of cores. The reason behind the increased performance by reducing the core count is that the operations executed on the operation threads normally execute very fast and there can be a very significant amount of overhead caused by thread parking and unparking. If there are less threads, a thread needs to do more work, will block less and therefore needs to be notified less.

Avoiding Random Changes

Tweaking can be very rewarding because significant performance improvements are possible. By default, Hazelcast tries to behave at its best for all situations, but this doesn’t always lead to the best performance. So if you know what you are doing and what to look for, it can be very rewarding to tweak. However it is also important that tweaking should be done with proper testing to see if there is actually an improvement. Tweaking without proper benchmarking is likely going to lead to confusion and could cause all kinds of problems. In case of doubt, we recommend not to tweak.

Creating the Right Benchmark Environment

When benchmarking, it is important that the benchmark reflects your production environment. Sometimes with calculated guess, a representative smaller environment can be set up; but if you want to use the benchmark statistics to inference how your production system is going to behave, you need to make sure that you get as close as your production setup as possible. Otherwise, you are at risk of spotting the issues too late or focusing on the things which are not relevant.

AWS Deployments

When you deploy Hazelcast clusters on AWS EC2 instances, you can consider to place the cluster members on the same Cluster Placement Group. This helps reducing the latency among members drastically. Additionally, you can also consider using private IPs instead of public ones to increase the throughput when the cluster members are placed in the same VPC.

For the best performance of your Hazelcast on AWS EC2:

  • Select the newest Linux AMIs.

  • Select the HVM based instances.

  • Select at least a system with 8 vCPUs, e.g., c4.2xlarge. For an overview of all types of EC2 instances, please check this web page.

  • Consider setting a placement group.

Pipelining

With the pipelining, you can send multiple requests in parallel using a single thread and therefore can increase throughput. As an example, suppose that the round trip time for a request/response is 1 millisecond. If synchronous requests are used, e.g., IMap.get(), then the maximum throughput out of these requests from a single thread is 1/001 = 1000 operations/second. One way to solve this problem is to introduce multithreading to make the requests in parallel. For the same example, if we would use 2 threads, then the maximum throughput doubles from 1000 operations/second, to 2000 operations/second.

However, introducing threads for the sake of executing requests isn’t always convenient and doesn’t always lead to an optimal performance; this is where the pipelining can be used. Instead of using multiple threads to have concurrent invocations, you can use asynchronous method calls such as IMap.getAsync(). If you would use 2 asynchronous calls from a single thread, then the maximum throughput is 2*(1/001) = 2000 operations/second. Therefore, to benefit from the pipelining, asynchronous calls need to be made from a single thread. The pipelining is a convenience implementation to provide back pressure, i.e., controlling the number of inflight operations, and it provides a convenient way to wait for all the results.

Pipelining<String> pipelining = new Pipelining<String>(10);
for (long k = 0; k < 100; k++) {
    int key = random.nextInt(keyDomain);
    pipelining.add(map.getAsync(key));
}
// wait for completion
List<String> results = pipelining.results();

In the above example, we make 100 asynchronous map.getAsync() calls, but the maximum number of inflight calls is 10.

By increasing the depth of the pipelining, throughput can be increased. The pipelining has its own back pressure, you do not need to enable the back pressure on the client or member to have this feature on the pipelining. However, if you have many pipelines, you may still need to enable the client/member back pressure because it is possible to overwhelm the system with requests in that situation. See the Back Pressure section to learn how to enable it on the client or member.

You can use the pipelining both on the clients and members. You do not need a special configuration, it works out-of-the-box.

The pipelining can be used for any asynchronous call. You can use it for IMap asynchronous get/put methods as well as for ICache, IAtomicLong, etc. It cannot be used as a transaction mechanism though. So you cannot do some calls and throw away the pipeline and expect that none of the requests are executed. If you want to use an atomic behavior, see the Transactions chapter. The pipelining is just a performance optimization, not a mechanism for atomic behavior.

The pipelines are cheap and should frequently be replaced because they accumulate results. It is fine to have a few hundred or even a few thousand calls being processed with the pipelining. However, all the responses to all requests are stored in the pipeline as long as the pipeline is referenced. So if you want to process a huge number of requests, then every few hundred or few thousand calls wait for the pipelining results and just create a new instance.

Note that the pipelines are not thread-safe. They must be used by a single thread.

Back Pressure

Hazelcast uses operations to make remote calls. For example, a map.get is an operation and a map.put is one operation for the primary and one operation for each of the backups, i.e., map.put is executed for the primary and also for each backup. In most cases, there is a natural balance between the number of threads performing operations and the number of operations being executed. However, the following may pile up this balance and operations and eventually lead to OutofMemoryException (OOME):

  • Asynchronous calls: With async calls, the system may be flooded with the requests.

  • Asynchronous backups: The asynchronous backups may be piling up.

To prevent the system from crashing, Hazelcast provides back pressure. Back pressure works by limiting the number of concurrent operation invocations and periodically making an async backup sync.

Member Side

Back pressure is disabled by default and you can enable it using the following system property:

hazelcast.backpressure.enabled

To control the number of concurrent invocations, you can configure the number of invocations allowed per partition using the following system property:

hazelcast.backpressure.max.concurrent.invocations.per.partition

The default value of this system property is 100. Using a default configuration a system is allowed to have (271 + 1) * 100 = 27200 concurrent invocations (271 partitions + 1 for generic operations).

Back pressure is only applied to normal operations. System operations like heart beats and repartitioning operations are not influenced by back pressure. 27200 invocations might seem like a lot, but keep in mind that executing a task on IExecutor or acquiring a lock also requires an operation.

If the maximum number of invocations has been reached, Hazelcast automatically applies an exponential backoff policy. This gives the system some time to deal with the load. Using the following system property, you can configure the maximum time to wait before a HazelcastOverloadException is thrown:

hazelcast.backpressure.backoff.timeout.millis

This system property’s default value is 60000 milliseconds.

The Health Monitor keeps an eye on the usage of the invocations. If it sees a member has consumed 70% or more of the invocations, it starts to log health messages.

Apart from controlling the number of invocations, you also need to control the number of pending async backups. This is done by periodically making these backups sync instead of async. This forces all pending backups to get drained. For this, Hazelcast tracks the number of asynchronous backups for each partition. At every Nth call, one synchronization is forced. This N is controlled through the following property:

hazelcast.backpressure.syncwindow

This system property’s default value is 100. It means, out of 100 asynchronous backups, Hazelcast makes 1 of them a synchronous one. A randomization is added, so the sync window with default configuration is between 75 and 125 invocations.

Client Side

To prevent the system on the client side from overloading, you can apply a constraint on the number of concurrent invocations. You can use the following system property on the client side for this purpose:

hazelcast.client.max.concurrent.invocations

This property defines the maximum allowed number of concurrent invocations. When it is not explicitly set, it has the value Integer.MAX_VALUE by default, which means infinite. When you set it and if the maximum number of concurrent invocations is exceeded this value, Hazelcast throws HazelcastOverloadException when a new invocation comes in.

Please note that back off timeout and controlling the number of pending async backups (sync window) is not supported on the client side.

See the System Properties appendix to learn how to configure the system properties.

Near Cache

Access to small-to-medium, read-mostly data sets may be sped up by creating a Near Cache. This cache maintains copies of distributed data in local memory for very fast access.

Benefits:

  • Avoids the network and deserialization costs of retrieving frequently-used data remotely

  • Eventually consistent

  • Can persist keys on a filesystem and reload them on restart. This means you can have your Near Cache ready right after application start

  • Can use deserialized objects as Near Cache keys to speed up lookups

Costs:

  • Increased memory consumption in the local JVM

  • High invalidation rates may outweigh the benefits of locality of reference

  • Strong consistency is not maintained; you may read stale data

Map or Cache entries in Hazelcast are partitioned across the cluster members. Hazelcast clients do not have local data at all. Suppose you read the key k a number of times from a Hazelcast client or k is owned by another member in your cluster. Then each map.get(k) or cache.get(k) will be a remote operation, which creates a lot of network trips. If you have a data structure that is mostly read, then you should consider creating a local Near Cache, so that reads are sped up and less network traffic is created.

These benefits do not come for free. See the following trade-offs:

  • Members with a Near Cache has to hold the extra cached data, which increases memory consumption.

  • If invalidation is enabled and entries are updated frequently, then invalidations will be costly.

  • Near Cache breaks the strong consistency guarantees; you might be reading stale data.

Near Cache is highly recommended for data structures that are mostly read.

In a client/server system you must enable the Near Cache separately on the client, without the need to configure it on the server. Please note that Near Cache configuration is specific to the server or client itself: a data structure on a server may not have Near Cache configured while the same data structure on a client may have Near Cache configured. They also can have different Near Cache configurations.

If you are using Near Cache, you should take into account that your hits to the keys in the Near Cache are not reflected as hits to the original keys on the primary members. This has for example an impact on IMap’s maximum idle seconds or time-to-live seconds expiration. Therefore, even though there is a hit on a key in Near Cache, your original key on the primary member may expire.

Near Cache works only when you access data via map.get(k) or cache.get(k) methods. Data returned using a predicate is not stored in the Near Cache.

Hazelcast Data Structures with Near Cache Support

The following matrix shows the Hazelcast data structures with Near Cache support. Please have a look at the next section for a detailed explanation of cache-local-entries, local-update-policy, preloader and serialize-keys.

Data structure Near Cache Support cache-local-entries local-update-policy preloader serialize-keys

IMap member

yes

yes

no

no

yes

IMap client

yes

no

no

yes

yes

JCache member

no

no

no

no

no

JCache client

yes

no

yes

yes

yes

ReplicatedMap member

no

no

no

no

no

ReplicatedMap client

yes

no

no

no

no

TransactionalMap member

limited

no

no

no

no

TransactionalMap client

no

no

no

no

no

Even though lite members do not store any data for Hazelcast data structures, you can enable Near Cache on lite members for faster reads.

Configuring Near Cache

The following shows the configuration for the Hazelcast Near Cache.

Please keep in mind that, if you want to use near cache on a Hazelcast member, configure it on the member; if you want to use it on a Hazelcast client, configure it on the client.

Declarative Configuration:

  • XML

  • YAML

<hazelcast>
    ...
    <near-cache name="myDataStructure">
        <in-memory-format>BINARY</in-memory-format>
        <invalidate-on-change>true</invalidate-on-change>
        <time-to-live-seconds>0</time-to-live-seconds>
        <max-idle-seconds>60</max-idle-seconds>
        <eviction eviction-policy="LFU"
            max-size-policy= "ENTRY_COUNT"
            size="1000"/>
        <cache-local-entries>false</cache-local-entries>
        <local-update-policy>INVALIDATE</local-update-policy>
        <preloader enabled="true"
             directory="nearcache-example"
             store-initial-delay-seconds="0"
             store-interval-seconds="0"/>
    </near-cache>
    ...
</hazelcast>
hazelcast:
    near-cache:
      myDataStructure:
        in-memory-format: BINARY
        invalidate-on-change: true
        time-to-live-seconds: 0
        max-idle-seconds: 60
        eviction:
          size: 1000
          max-size-policy: ENTRY_COUNT
          eviction-policy: LFU
        cache-local-entries: false
        local-update-policy: INVALIDATE
        preloader:
          enabled: true
          directory: nearcache-example
          store-initial-delay-seconds: 0
          store-interval-seconds: 0

The element <near-cache> has an optional attribute name whose default value is default.

Programmatic Configuration:

        EvictionConfig evictionConfig = new EvictionConfig()
                .setMaxSizePolicy(MaxSizePolicy.ENTRY_COUNT)
                .setEvictionPolicy(EvictionPolicy.LRU)
                .setSize( 1 );

        NearCachePreloaderConfig preloaderConfig = new NearCachePreloaderConfig()
                .setEnabled(true)
                .setDirectory("nearcache-example")
                .setStoreInitialDelaySeconds( 1 )
                .setStoreIntervalSeconds( 2 );

        NearCacheConfig nearCacheConfig = new NearCacheConfig()
                .setName("myDataStructure")
                .setInMemoryFormat(InMemoryFormat.BINARY)
                .setSerializeKeys(true)
                .setInvalidateOnChange(false)
                .setTimeToLiveSeconds( 1 )
                .setMaxIdleSeconds( 5 )
                .setEvictionConfig(evictionConfig)
                .setCacheLocalEntries(true)
                .setLocalUpdatePolicy(NearCacheConfig.LocalUpdatePolicy.CACHE_ON_UPDATE)
                .setPreloaderConfig(preloaderConfig);

The class NearCacheConfig is used for all supported Hazelcast data structures on members and clients.

The following are the descriptions of all configuration elements and attributes:

  • in-memory-format: Specifies in which format data is stored in your Near Cache. Note that a map’s in-memory format can be different from that of its Near Cache. Available values are as follows:

    • BINARY: Data is stored in serialized binary format (default value).

    • OBJECT: Data is stored in deserialized form.

    • NATIVE: Data is stored in the Near Cache that uses Hazelcast’s High-Density Memory Store feature. This option is available only in Hazelcast Enterprise. Note that a map and its Near Cache can independently use High-Density Memory Store. For example, while your map does not use High-Density Memory Store, its Near Cache can use it.

  • invalidate-on-change: Specifies whether the cached entries are evicted when the entries are updated or removed. Its default value is true.

  • time-to-live-seconds: Maximum number of seconds for each entry to stay in the Near Cache. Entries that are older than this period are automatically evicted from the Near Cache. Regardless of the eviction policy used, time-to-live-seconds still applies. Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Its default value is 0.

  • max-idle-seconds: Maximum number of seconds each entry can stay in the Near Cache as untouched (not read). Entries that are not read more than this period are removed from the Near Cache. Any integer between 0 and Integer.MAX_VALUE. 0 means Integer.MAX_VALUE. Its default value is 0.

  • eviction: Specifies the eviction behavior when you use High-Density Memory Store for your Near Cache. It has the following attributes:

    • eviction-policy: Eviction policy configuration. Available values are as follows:

      • LRU: Least Recently Used (default value).

      • LFU: Least Frequently Used.

      • NONE: No items are evicted and the property max-size is ignored. You still can combine it with time-to-live-seconds and max-idle-seconds to evict items from the Near Cache.

      • RANDOM: A random item is evicted.

    • max-size-policy: Maximum size policy for eviction of the Near Cache. Available values are as follows:

      • ENTRY_COUNT: Maximum size based on the entry count in the Near Cache (default value).

      • USED_NATIVE_MEMORY_SIZE: Maximum used native memory size of the specified Near Cache in MB to trigger the eviction. If the used native memory size exceeds this threshold, the eviction is triggered. Available only for NATIVE in-memory format. This is supported only by Hazelcast Enterprise.

      • USED_NATIVE_MEMORY_PERCENTAGE: Maximum used native memory percentage of the specified Near Cache to trigger the eviction. If the native memory usage percentage (relative to maximum native memory size) exceeds this threshold, the eviction is triggered. Available only for NATIVE in-memory format. This is supported only by Hazelcast Enterprise.

      • FREE_NATIVE_MEMORY_SIZE: Minimum free native memory size of the specified Near Cache in MB to trigger the eviction. If free native memory size goes below this threshold, eviction is triggered. Available only for NATIVE in-memory format. This is supported only by Hazelcast Enterprise.

      • FREE_NATIVE_MEMORY_PERCENTAGE: Minimum free native memory percentage of the specified Near Cache to trigger eviction. If free native memory percentage (relative to maximum native memory size) goes below this threshold, eviction is triggered. Available only for NATIVE in-memory format. This is supported only by Hazelcast Enterprise.

    • size: Maximum size of the Near Cache used for max-size-policy. When this is reached the Near Cache is evicted based on the policy defined. Any integer between 1 and Integer.MAX_VALUE. This value has different defaults, depending on the data structure.

      • IMap: Its default value is Integer.MAX_VALUE for on-heap maps and 10000 for the NATIVE in-memory format.

      • JCache: Its default value is 10000.

  • cache-local-entries: Specifies whether the local entries are cached. It can be useful when in-memory format for Near Cache is different from that of the map. By default, it is disabled. Is just available on Hazelcast members, not on Hazelcast clients (which have no local entries).

  • local-update-policy: Specifies the update policy of the local Near Cache. It is available on JCache clients. Available values are as follows:

    • INVALIDATE: Removes the Near Cache entry on mutation. After the mutative call to the member completes but before the operation returns to the caller, the Near Cache entry is removed. Until the mutative operation completes, the readers still continue to read the old value. But as soon as the update completes the Near Cache entry is removed. Any threads reading the key after this point will have a Near Cache miss and call through to the member, obtaining the new entry. This setting provides read-your-writes consistency. This is the default setting.

    • CACHE_ON_UPDATE: Updates the Near Cache entry on mutation. After the mutative call to the member completes but before the put returns to the caller, the Near Cache entry is updated. So a remove will remove it and one of the put methods will update it to the new value. Until the update/remove operation completes, the entry’s old value can still be read from the Near Cache. But before the call completes the Near Cache entry is updated. Any threads reading the key after this point read the new entry. If the mutative operation was a remove, the key will no longer exist in the cache, both the Near Cache and the original copy in the member. The member initiates an invalidate event to any other Near Caches, however the caller Near Cache is not invalidated as it already has the new value. This setting also provides read-your-writes consistency.

  • preloader: Specifies if the Near Cache should store and pre-load its keys for a faster re-population after a Hazelcast client restart. Is just available on IMap and JCache clients. It has the following attributes:

    • enabled: Specifies whether the preloader for this Near Cache is enabled or not, true or false.

    • directory: Specifies the parent directory for the preloader of this Near Cache. The filenames for the preloader storage are generated from the Near Cache name. You can additionally specify the parent directory to have multiple clients on the same machine with the same Near Cache names.

    • store-initial-delay-seconds: Specifies the delay in seconds until the keys of this Near Cache are stored for the first time. Its default value is 600 seconds.

    • store-interval-seconds: Specifies the interval in seconds in which the keys of this Near Cache are stored. Its default value is 600 seconds.

Near Cache Configuration Examples

This section shows some configuration examples for different Hazelcast data structures.

Near Cache Example for IMap

The following are configuration examples for IMap Near Caches for Hazelcast members and clients.

  • XML

  • YAML

  • Java

<hazelcast>
    ...
    <map name="mostlyReadMap">
        <in-memory-format>BINARY</in-memory-format>
        <near-cache>
            <in-memory-format>OBJECT</in-memory-format>
            <invalidate-on-change>false</invalidate-on-change>
            <time-to-live-seconds>600</time-to-live-seconds>
            <eviction eviction-policy="NONE" max-size-policy="ENTRY_COUNT" size="5000"/>
            <cache-local-entries>true</cache-local-entries>
        </near-cache>
    </map>
    ...
</hazelcast>
hazelcast:
  map:
    mostlyReadMap:
      in-memory-format: BINARY
      near-cache:
        in-memory-format: OBJECT
        invalidate-on-change: false
        time-to-live-seconds: 600
        eviction:
          eviction-policy: NONE
          max-size-policy: ENTRY_COUNT
          size: 5000
          cache-local-entries: true
EvictionConfig evictionConfig = new EvictionConfig()
  .setEvictionPolicy(EvictionPolicy.NONE)
  .setMaximumSizePolicy(MaxSizePolicy.ENTRY_COUNT)
  .setSize(5000);

NearCacheConfig nearCacheConfig = new NearCacheConfig()
  .setInMemoryFormat(InMemoryFormat.OBJECT)
  .setInvalidateOnChange(false)
  .setTimeToLiveSeconds(600)
  .setEvictionConfig(evictionConfig);

Config config = new Config();
config.getMapConfig("mostlyReadMap")
  .setInMemoryFormat(InMemoryFormat.BINARY)
  .setNearCacheConfig(nearCacheConfig);

The Near Cache configuration for maps on members is a child of the map configuration, so you do not have to define the map name in the Near Cache configuration.

  • XML

  • YAML

  • Java

<hazelcast-client>
    ...
    <near-cache name="mostlyReadMap">
        <in-memory-format>OBJECT</in-memory-format>
        <invalidate-on-change>true</invalidate-on-change>
        <eviction eviction-policy="LRU" max-size-policy="ENTRY_COUNT" size="50000"/>
    </near-cache>
    ...
</hazelcast-client>
hazelcast-client:
  near-cache:
    mostlyReadMap:
      in-memory-format: OBJECT
      invalidate-on-change: true
      eviction:
        eviction-policy: LRU
        max-size-policy: ENTRY_COUNT
        size: 50000
EvictionConfig evictionConfig = new EvictionConfig()
  .setEvictionPolicy(EvictionPolicy.LRU)
  .setMaximumSizePolicy(MaxSizePolicy.ENTRY_COUNT)
  .setSize(50000);

NearCacheConfig nearCacheConfig = new NearCacheConfig()
  .setName("mostlyReadMap")
  .setInMemoryFormat(InMemoryFormat.OBJECT)
  .setInvalidateOnChange(true)
  .setEvictionConfig(evictionConfig);

ClientConfig clientConfig = new ClientConfig()
  .addNearCacheConfig(nearCacheConfig);

The Near Cache on the client side must have the same name as the data structure on the member for which this Near Cache is being created. You can use wildcards, so in this example mostlyRead* would also match the map mostlyReadMap.

A Near Cache can have its own in-memory-format which is independent of the in-memory-format of the data structure.

Near Cache Example for JCache Clients

The following is a configuration example for a JCache Near Cache for a Hazelcast client.

  • XML

  • YAML

  • Java

<hazelcast-client>
    ...
    <near-cache name="mostlyReadCache">
        <in-memory-format>OBJECT</in-memory-format>
        <invalidate-on-change>true</invalidate-on-change>
        <eviction eviction-policy="LRU" max-size-policy="ENTRY_COUNT" size="30000"/>
        <local-update-policy>CACHE_ON_UPDATE</local-update-policy>
    </near-cache>
    ...
</hazelcast-client>
hazelcast-client:
  near-cache:
    mostlyReadCache:
      in-memory-format: OBJECT
      invalidate-on-change: true
      eviction:
        eviction-policy: LRU
        max-size-policy: ENTRY_COUNT
        size: 30000
      local-update-policy: CACHE_ON_UPDATE
EvictionConfig evictionConfig = new EvictionConfig()
  .setEvictionPolicy(EvictionPolicy.LRU)
  .setMaximumSizePolicy(MaxSizePolicy.ENTRY_COUNT)
  .setSize(30000);

NearCacheConfig nearCacheConfig = new NearCacheConfig()
  .setName("mostlyReadCache")
  .setInMemoryFormat(InMemoryFormat.OBJECT)
  .setInvalidateOnChange(true)
  .setEvictionConfig(evictionConfig)
  .setLocalUpdatePolicy(LocalUpdatePolicy.CACHE_ON_UPDATE);

ClientConfig clientConfig = new ClientConfig()
  .addNearCacheConfig(nearCacheConfig);

Example for Near Cache with High-Density Memory Store

Hazelcast Enterprise Feature

The following is a configuration example for an IMap High-Density Near Cache for a Hazelcast member.

  • XML

  • YAML

  • Java

<hazelcast>
    ...
    <map name="mostlyReadMapWithHighDensityNearCache">
        <in-memory-format>OBJECT</in-memory-format>
        <near-cache>
            <in-memory-format>NATIVE</in-memory-format>
            <eviction eviction-policy="LFU" max-size-policy="USED_NATIVE_MEMORY_PERCENTAGE" size="90"/>
        </near-cache>
    </map>
    ...
</hazelcast>
hazelcast:
  map:
    mostlyReadMapWithHighDensityNearCache
      in-memory-format: OBJECT
      near-cache:
        in-memory-format: NATIVE
        eviction:
          eviction-policy: LFU
          max-size-policy: USED_NATIVE_MEMORY_PERCENTAGE
          size: 90
EvictionConfig evictionConfig = new EvictionConfig()
  .setEvictionPolicy(EvictionPolicy.LFU)
  .setMaximumSizePolicy(MaxSizePolicy.USED_NATIVE_MEMORY_PERCENTAGE)
  .setSize(90);

NearCacheConfig nearCacheConfig = new NearCacheConfig()
  .setInMemoryFormat(InMemoryFormat.NATIVE)
  .setEvictionConfig(evictionConfig);

Config config = new Config();
config.getMapConfig("mostlyReadMapWithHighDensityNearCache")
  .setInMemoryFormat(InMemoryFormat.OBJECT)
  .setNearCacheConfig(nearCacheConfig);

Keep in mind that you should have already enabled the High-Density Memory Store usage for your cluster. See the Configuring High-Density Memory Store section.

Note that a map and its Near Cache can independently use High-Density Memory Store. For example, if your map does not use High-Density Memory Store, its Near Cache can still use it.

Near Cache Eviction

In the scope of Near Cache, eviction means evicting (clearing) the entries selected according to the given eviction-policy when the specified max-size-policy has been reached.

The max-size-policy defines the state when the Near Cache is full and determines whether the eviction should be triggered. The size is either interpreted as entry count, memory size or percentage, depending on the chosen policy.

Once the eviction is triggered the configured eviction-policy determines which, if any, entries must be evicted.

Note that the policies mentioned are configured under the near-cache configuration block, as seen in the above configuration examples.

Near Cache Expiration

Expiration means the eviction of expired records. A record is expired:

  • if it is not touched (accessed/read) for max-idle-seconds

  • time-to-live-seconds passed since it is put to Near Cache.

The actual expiration is performed in the following cases:

  • When a record is accessed: it is checked if the record is expired or not. If it is expired, it is evicted and null is returned as the value to the caller.

  • In the background: there is an expiration task that periodically (currently 5 seconds) scans records and evicts the expired records.

Note that max-idle-seconds and time-to-live-seconds are configured under the near-cache configuration block, as seen in the above configuration examples.

Near Cache Invalidation

Invalidation is the process of removing an entry from the Near Cache when its value is updated or it is removed from the original data structure (to prevent stale reads). Near Cache invalidation happens asynchronously at the cluster level, but synchronously at the current member. This means that the Near Cache is invalidated within the whole cluster after the modifying operation is finished, but updated from the current member before the modifying operation is done. A modifying operation can be an EntryProcessor, an explicit update or remove as well as an expiration or eviction. Generally, whenever the state of an entry changes in the record store by updating its value or removing it, the invalidation event is sent for that entry.

Invalidations can be sent from members to client Near Caches or to member Near Caches, either individually or in batches. Default behavior is sending in batches. If there are lots of mutating operations such as put/remove on data structures, it is advised that you configure batch invalidations. This reduces the network traffic and keeps the eventing system less busy, but may increase the delay of individual invalidations.

You can use the following system properties to configure the Near Cache invalidation:

  • hazelcast.map.invalidation.batch.enabled: Enable or disable batching. Its default value is true. When it is set to false, all invalidations are sent immediately.

  • hazelcast.map.invalidation.batch.size: Maximum number of invalidations in a batch. Its default value is 100.

  • hazelcast.map.invalidation.batchfrequency.seconds: If the collected invalidations do not reach the configured batch size, a background process sends them periodically. Its default value is 10 seconds.

If there are a lot of clients or many mutating operations, batching should remain enabled and the batch size should be configured with the hazelcast.map.invalidation.batch.size system property to a suitable value.

Near Cache Consistency

Eventual Consistency

Near Caches are invalidated by invalidation events. Invalidation events can be lost due to the fire-and-forget fashion of eventing system. If an event is lost, reads from Near Cache can indefinitely be stale.

To solve this problem, Hazelcast provides eventually consistent behavior for IMap/JCache Near Caches by detecting invalidation losses. After detection of an invalidation loss, stale data is made unreachable and Near Cache’s get calls to that data are directed to the underlying IMap/JCache to fetch the fresh data.

You can configure eventual consistency with the system properties below (same properties are valid for both member and client side Near Caches):

  • hazelcast.invalidation.max.tolerated.miss.count: Default value is 10. If missed invalidation count is bigger than this value, relevant cached data is made unreachable.

  • hazelcast.invalidation.reconciliation.interval.seconds: Default value is 60 seconds. This is a periodic task that scans cluster members periodically to compare generated invalidation events with the received ones from Near Cache.

Locally Initiated Changes

For local invalidations, when a record is updated/removed, future reads will see this update/remove to provide read-your-writes consistency. To achieve this consistency, Near Cache configuration provides the following update policies:

  • INVALIDATE

  • CACHE_ON_UPDATE

If you choose INVALIDATE, the entry is removed from the Near Cache after the update/remove occurs in the underlying data structure and before the operation (get) returns to the caller. Until the update/remove operation completes, the entry’s old value can still be read from the Near Cache.

If you choose CACHE_ON_UPDATE, the entry is updated after the update/remove occurs in the underlying data structure and before the operation (put/get) returns to the caller. If it is an update operation, it removes the entry and the new value is placed. Until the update/remove operation completes, the entry’s old value can still be read from the Near Cache. Any threads reading the key after this point read the new entry. If the mutative operation was a remove, the key will no longer exist in the Near Cache and the original copy in the member.

Near Cache Preloader

The Near Cache preloader is a functionality to store the keys from a Near Cache to provide a fast re-population of the previous hot data set after a Hazelcast Client has been restarted. It is available on IMap and JCache clients.

The Near Cache preloader stores the keys (not the values) of Near Cache entries in regular intervals. You can define the initial delay via store-initial-delay-seconds, e.g., if you know that your hot data set needs some time to build up. You can configure the interval via store-interval-seconds which determines how often the key-set is stored. The persistence does not run continuously. Whenever the storage is scheduled, it is performed on the actual keys in the Near Cache.

The Near Cache preloader is triggered on the first initialization of the data structure on the client, e.g., client.getMap("myNearCacheMap"). This schedules the preloader, which works in the background, so your application is not blocked. The storage is enabled after the loading is completed.

The configuration parameter directory is optional. If you omit it, the base directory becomes the user working directory (normally where the JVM was started or configured with the system property user.dir). The storage filenames are always created from the Near Cache name. So even if you use a wildcard name in the Near Cache Configuration, the preloader filenames are unique.

If you run multiple Hazelcast clients with enabled Near Cache preloader on the same machine, you have to configure a unique storage filename for each client or run them from different user directories. If two clients would write into the same file, only the first client succeeds. The following clients throw an exception as soon as the Near Cache preloader is triggered.

Data Affinity

Data affinity ensures that related entries exist on the same member. If related data is on the same member, operations can be executed without the cost of extra network calls and extra wire data. This feature is provided by using the same partition keys for related data.

PartitionAware

Co-location of related data and computation

Hazelcast has a standard way of finding out which member owns/manages each key object. The following operations are routed to the same member, since all them are operating based on the same key "key1".

HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
Map mapA = hazelcastInstance.getMap( "mapA" );
Map mapB = hazelcastInstance.getMap( "mapB" );
Map mapC = hazelcastInstance.getMap( "mapC" );

// since map names are different, operation will be manipulating
// different entries, but the operation will take place on the
// same member since the keys ("key1") are the same
mapA.put( "key1", value );
mapB.get( "key1" );
mapC.remove( "key1" );

// lock operation will still execute on the same member
// of the cluster since the key ("key1") is same
hazelcastInstance.getLock( "key1" ).lock();

// distributed execution will execute the 'runnable' on the
// same member since "key1" is passed as the key.
hazelcastInstance.getExecutorService().executeOnKeyOwner( runnable, "key1" );

When the keys are the same, entries are stored on the same member. But we sometimes want to have related entries stored on the same member, such as a customer and his/her order entries. We would have a customers map with customerId as the key and an orders map with orderId as the key. Since customerId and orderId are different keys, a customer and his/her orders may fall into different members in your cluster. So how can we have them stored on the same member? We create an affinity between customer and orders. If we make them part of the same partition then these entries will be co-located. We achieve this by making orderKey s PartitionAware.

final class OrderKey implements PartitionAware, Serializable {

    private final long orderId;
    private final long customerId;

    OrderKey(long orderId, long customerId) {
        this.orderId = orderId;
        this.customerId = customerId;
    }

    @Override
    public Object getPartitionKey() {
        return customerId;
    }

    @Override
    public String toString() {
        return "OrderKey{"
                + "orderId=" + orderId
                + ", customerId=" + customerId
                + '}';

Notice that OrderKey implements PartitionAware and that getPartitionKey() returns the customerId. These make sure that the Customer entry and its Orders are stored on the same member.

HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
Map mapCustomers = hazelcastInstance.getMap( "customers" );
Map mapOrders = hazelcastInstance.getMap( "orders" );

// create the customer entry with customer id = 1
mapCustomers.put( 1, customer );

// now create the orders for this customer
mapOrders.put( new OrderKey( 21, 1 ), order );
mapOrders.put( new OrderKey( 22, 1 ), order );
mapOrders.put( new OrderKey( 23, 1 ), order );

Assume that you have a customers map where customerId is the key and the customer object is the value. You want to remove one of the customer orders and return the number of remaining orders. Here is how you would normally do it.

public static int removeOrder( long customerId, long orderId ) throws Exception {
    IMap<Long, Customer> mapCustomers = instance.getMap( "customers" );
    IMap mapOrders = hazelcastInstance.getMap( "orders" );

    mapCustomers.lock( customerId );
    mapOrders.remove( new OrderKey(orderId, customerId) );
    Set orders = orderMap.keySet(Predicates.equal( "customerId", customerId ));
    mapCustomers.unlock( customerId );

    return orders.size();
}

There are couple of things you should consider.

  • There are four distributed operations there: lock, remove, keySet, unlock. Can you reduce the number of distributed operations?

  • The customer object may not be that big, but can you not have to pass that object through the wire? Think about a scenario where you set order count to the customer object for fast access, so you should do a get and a put, and as a result, the customer object is passed through the wire twice.

Instead, why not move the computation over to the member (JVM) where your customer data resides. Here is how you can do this with distributed executor service.

  1. Send a PartitionAware Callable task.

  2. Callable does the deletion of the order right there and returns with the remaining order count.

  3. Upon completion of the Callable task, return the result (remaining order count). You do not have to wait until the task is completed; since distributed executions are asynchronous, you can do other things in the meantime.

Here is an example code.

HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();

    public int removeOrder(long customerId, long orderId) throws Exception {
        IExecutorService executorService = hazelcastInstance.getExecutorService("ExecutorService");

        OrderDeletionTask task = new OrderDeletionTask(customerId, orderId);
        Future<Integer> future = executorService.submit(task);
        int remainingOrders = future.get();

        return remainingOrders;
    }

    public static class OrderDeletionTask
            implements Callable<Integer>, PartitionAware, Serializable, HazelcastInstanceAware {

        private long orderId;
        private long customerId;
        private HazelcastInstance hazelcastInstance;

        public OrderDeletionTask() {
        }

        public OrderDeletionTask(long customerId, long orderId) {
            this.customerId = customerId;
            this.orderId = orderId;
        }

        @Override
        public Integer call() {
            IMap<Long, Customer> customerMap = hazelcastInstance.getMap("customers");
            IMap<OrderKey, Order> orderMap = hazelcastInstance.getMap("orders");

            customerMap.lock(customerId);

            Predicate predicate = Predicates.equal("customerId", customerId);
            Set<OrderKey> orderKeys = orderMap.localKeySet(predicate);
            int orderCount = orderKeys.size();
            for (OrderKey key : orderKeys) {
                if (key.orderId == orderId) {
                    orderCount--;
                    orderMap.delete(key);
                }
            }

            customerMap.unlock(customerId);

            return orderCount;
        }

        @Override
        public Object getPartitionKey() {
            return customerId;
        }

        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            this.hazelcastInstance = hazelcastInstance;
        }
    }

The following are the benefits of doing the same operation with distributed ExecutorService based on the key:

  • only one distributed execution (executorService.submit(task)), instead of four

  • less data is sent over the wire

  • less lock duration, i.e., higher concurrency, for the Customer entry since lock/update/unlock cycle is done locally (local to the customer data)

PartitioningStrategy

Another way of storing the related data on the same location is using/implementing the class PartitioningStrategy. Normally (if no partitioning strategy is defined), Hazelcast finds the partition of a key first by converting the object to binary and then by hashing this binary. If a partitioning strategy is defined, Hazelcast injects the key to the strategy and the strategy returns an object out of which the partition is calculated by hashing it.

Hazelcast offers the following out-of-the-box partitioning strategies:

  • DefaultPartitioningStrategy: Default strategy. It checks whether the key implements PartitionAware. If it implements, the object is converted to binary and then hashed, to find the partition of the key.

  • StringPartitioningStrategy: Works only for string keys. It uses the string after @ character as the partition ID. For example, if you have two keys ordergroup1@region1 and customergroup1@region1, both ordergroup1 and customergroup1 fall into the partition where region1 is located.

  • StringAndPartitionAwarePartitioningStrategy: Works as the combination of the above two strategies. If the key implements PartitionAware, it works like the DefaultPartitioningStrategy. If it is a string key, it works like the StringPartitioningStrategy.

Following are the example configuration snippets. Note that these strategy configurations are per map.

Declarative Configuration:

  • XML

  • YAML

<hazelcast>
    ...
    <map name="name-of-the-map">
        <partition-strategy>
             com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy
        </partition-strategy>
    </map>
    ...
</hazelcast>
hazelcast:
  map:
    name-of-the-map:
      partition-strategy: com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy

Programmatic Configuration:

Config config = new Config();
MapConfig mapConfig = config.getMapConfig("name-of-the-map");
PartitioningStrategyConfig psConfig = mapConfig.getPartitioningStrategyConfig();
psConfig.setPartitioningStrategyClass( "com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy" );

// OR
psConfig.setPartitioningStrategy(YourCustomPartitioningStrategy);
...

You can also define your own partition strategy by implementing the class PartitioningStrategy. To enable your implementation, add the full class name to your Hazelcast configuration using either the declarative or programmatic approach, as exemplified above.

The examples above show how to define a partitioning strategy per map. Note that all the members of your cluster must have the same partitioning strategy configurations.

You can also change a global strategy which is applied to all the data structures in your cluster. This can be done by defining the hazelcast.partitioning.strategy.class system property. An example declarative way of configuring this property is shown below:

  • XML

  • YAML

<hazelcast>
    ...
    <properties>
        <property name="hazelcast.partitioning.strategy.class">
            com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy
        </property>
    </properties>
    ...
</hazelcast>
hazelcast:
  properties:
    hazelcast.partitioning.strategy.class: com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy

You can specify the aforementioned out-of-the-box strategies or your custom partitioning strategy.

You can also use other system property configuring options as explained in the Configuring with System Properties section.

The per map and global (cluster) partitioning strategies are supported on the member side. Hazelcast Java clients only support the global strategy and it is configured via the same system property used in the members (`hazelcast.partitioning.strategy.class `).

CPU Thread Affinity

Hazelcast offers configuring CPU threads so that you have a lot better control on the latency and a better throughput. This configuration provides you with the CPU thread affinity, where certain threads can have affinity for particular CPUs.

The following affinity configurations are available for a member:

-Dhazelcast.io.input.thread.affinity=1-3
-Dhazelcast.io.output.thread.affinity=3-5
-Dhazelcast.operation.thread.affinity=7-10,13
-Dhazelcast.operation.response.thread.affinity=15,16

The following affinity configurations are available for a client:

-Dhazelcast.client.io.input.thread.affinity=1-4
-Dhazelcast.client.io.output.thread.affinity=5-8
-Dhazelcast.client.response.thread.affinity=7-9

You can set the CPU thread affinity properties shown above only on the command line.

Let’s have a look at how we define the values for the above configuration properties:

  • Individual CPUs, e.g., 1,2,3: This means there are going to be three threads. The first thread runs on CPU 1, the second thread on CPU 2, and so on.

  • CPU ranges, e.g., 1-3: Shortcut syntax for 1,2,3.

  • Group, e.g., [1-3]: This configures three threads and each of these threads can run on CPU 1, 2 and 3.

  • Group with thread count, e.g., [1-3]:2: This configures two threads and each of these two threads can run on CPU 1, 2 and 3.

You can also combine those, e.g., 1,2,[5-7],[10,12,16]:2.

Note that, the syntax for CPU thread affinity shown above not only determines the mapping of CPUs to threads, it also determines the thread count. If you use CPU thread affinity, e.g., hazelcast.io.input.thread.affinity, then hazelcast.io.input.thread.count is ignored. See Threading Model for more information about specifying explicit thread counts.

If you don’t configure affinity for a category of threads, it means they can run on any CPU.

Let’s look at an example. Assuming you have the numactl utility, run the following command on your machine to see the mapping between the NUMA nodes and threads:

numactl --hardware

An example output is shown below:

available: 2 nodes (0-1)
node 0 cpus: 0 1 2 3 4 5 6 7 8 9 20 21 22 23 24 25 26 27 28 29
node 0 size: 393090 MB
node 0 free: 372729 MB
node 1 cpus: 10 11 12 13 14 15 16 17 18 19 30 31 32 33 34 35 36 37 38 39
node 1 size: 393216 MB
node 1 free: 343296 MB
node distances:
node   0   1
  0:  10  21
  1:  21  10

If you want to configure 20 threads on NUMA node 0 and 20 threads on NUMA node 1, and confine the threads to these NUMA nodes, you can use the following configuration:

-Dhazelcast.operation.thread.affinity=[0-9,20-29],[10-19,30-39]

See here for information about NUMA nodes.

Threading Model

Your application server has its own threads. Hazelcast does not use these; it manages its own threads.

I/O Threading

Hazelcast uses a pool of threads for I/O. A single thread does not perform all the I/O. Instead, multiple threads perform the I/O. On each cluster member, the I/O threading is split up in 3 types of I/O threads:

  • I/O thread for the accept requests

  • I/O threads to read data from other members/clients

  • I/O threads to write data to other members/clients

You can configure the number of I/O threads using the hazelcast.io.thread.count system property. Its default value is 3 per member. If 3 is used, in total there are 7 I/O threads: 1 accept I/O thread, 3 read I/O threads and 3 write I/O threads. Each I/O thread has its own Selector instance and waits on the Selector.select if there is nothing to do.

You can also specify counts for input and output threads separately. There are hazelcast.io.input.thread.count and hazelcast.io.output.thread.count properties for this purpose. See the System Properties appendix for information about these properties and how to set them.

Hazelcast periodically scans utilization of each I/O thread and can decide to migrate a connection to a new thread if the existing thread is servicing a disproportionate number of I/O events. You can customize the scanning interval by configuring the hazelcast.io.balancer.interval.seconds system property; its default interval is 20 seconds. You can disable the balancing process by setting this property to a negative value.

In case of the read I/O thread, when sufficient bytes for a packet have been received, the Packet object is created. This Packet object is then sent to the system where it is de-multiplexed. If the Packet header signals that it is an operation/response, the Packet is handed over to the operation service (see the Operation Threading section). If the Packet is an event, it is handed over to the event service (see the Event Threading section).

Event Threading

Hazelcast uses a shared event system to deal with components that rely on events, such as topic, collections, listeners and Near Cache.

Each cluster member has an array of event threads and each thread has its own work queue. When an event is produced, either locally or remotely, an event thread is selected (depending on if there is a message ordering) and the event is placed in the work queue for that event thread.

You can set the following properties to alter the system’s behavior:

  • hazelcast.event.thread.count: Number of event-threads in this array. Its default value is 5.

  • hazelcast.event.queue.capacity: Capacity of the work queue. Its default value is 1000000.

  • hazelcast.event.queue.timeout.millis: Timeout for placing an item on the work queue in milliseconds. Its default value is 250 milliseconds.

If you process a lot of events and have many cores, changing the value of hazelcast.event.thread.count property to a higher value is a good practice. This way, more events can be processed in parallel.

Multiple components share the same event queues. If there are 2 topics, say A and B, for certain messages they may share the same queue(s) and hence the same event thread. If there are a lot of pending messages produced by A, then B needs to wait. Also, when processing a message from A takes a lot of time and the event thread is used for that, B suffers from this. That is why it is better to offload processing to a dedicated thread (pool) so that systems are better isolated.

If the events are produced at a higher rate than they are consumed, the queue grows in size. To prevent overloading the system and running into an OutOfMemoryException, the queue is given a capacity of 1 million items. When the maximum capacity is reached, the items are dropped. This means that the event system is a 'best effort' system. There is no guarantee that you are going to get an event. Topic A might have a lot of pending messages and therefore B cannot receive messages because the queue has no capacity and messages for B are dropped.

IExecutor Threading

Executor threading is straight forward. When a task is received to be executed on Executor E, then E will have its own ThreadPoolExecutor instance and the work is placed in the work queue of this executor. Thus, Executors are fully isolated, but still share the same underlying hardware - most importantly the CPUs.

You can configure the IExecutor using the ExecutorConfig (programmatic configuration) or using <executor> (declarative configuration). See also the Configuring Executor Service section.

Operation Threading

The following are the operation types:

  • operations that are aware of a certain partition, e.g., IMap.get(key)

  • operations that are not partition aware, e.g., IExecutorService.executeOnMember(command, member)

Each of these operation types has a different threading model explained in the following sections.

Partition-aware Operations

To execute partition-aware operations, an array of operation threads is created. The default value of this array’s size is the number of cores and it has a minimum value of 2. This value can be changed using the hazelcast.operation.thread.count property.

Each operation thread has its own work queue and it consumes messages from this work queue. If a partition-aware operation needs to be scheduled, the right thread is found using the formula below.

threadIndex = partitionId % partition thread-count

After the threadIndex is determined, the operation is put in the work queue of that operation thread. This means the followings:

  • A single operation thread executes operations for multiple partitions; if there are 271 partitions and 10 partition threads, then roughly every operation thread executes operations for 27 partitions.

  • Each partition belongs to only 1 operation thread. All operations for a partition are always handled by exactly the same operation thread.

  • Concurrency control is not needed to deal with partition-aware operations because once a partition-aware operation is put in the work queue of a partition-aware operation thread, only 1 thread is able to touch that partition.

Because of this threading strategy, there are two forms of false sharing you need to be aware of:

  • False sharing of the partition - two completely independent data structures share the same partition. For example, if there is a map employees and a map orders, the method employees.get("peter") running on partition 25 may be blocked by the method orders.get(1234) also running on partition 25. If independent data structures share the same partition, a slow operation on one data structure can slow down the other data structures.

  • False sharing of the partition-aware operation thread - each operation thread is responsible for executing operations on a number of partitions. For example, thread 1 could be responsible for partitions 0, 10, 20, etc. and thread-2 could be responsible for partitions 1, 11, 21, etc. If an operation for partition 1 takes a lot of time, it blocks the execution of an operation for partition 11 because both of them are mapped to the same operation thread.

You need to be careful with long running operations because you could starve operations of a thread. As a general rule, the partition thread should be released as soon as possible because operations are not designed as long running operations. That is why, for example, it is very dangerous to execute a long running operation using AtomicReference.alter() or an IMap.executeOnKey(), because these operations block other operations to be executed.

Currently, there is no support for work stealing. Different partitions that map to the same thread may need to wait till one of the partitions is finished, even though there are other free partition-aware operation threads available.

Example:

Take a cluster with three members. Two members have 90 primary partitions and one member has 91 primary partitions. Let’s say you have one CPU and four cores per CPU. By default, four operation threads will be allocated to serve 90 or 91 partitions.

Non-Partition-aware Operations

To execute operations that are not partition-aware, e.g., IExecutorService.executeOnMember(command, member), generic operation threads are used. When the Hazelcast instance is started, an array of operation threads is created. The size of this array has a default value of the number of cores divided by two with a minimum value of 2. It can be changed using the hazelcast.operation.generic.thread.count property.

A non-partition-aware operation thread does not execute an operation for a specific partition. Only partition-aware operation threads execute partition-aware operations.

Unlike the partition-aware operation threads, all the generic operation threads share the same work queue: genericWorkQueue.

If a non-partition-aware operation needs to be executed, it is placed in that work queue and any generic operation thread can execute it. The big advantage is that you automatically have work balancing since any generic operation thread is allowed to pick up work from this queue.

The disadvantage is that this shared queue can be a point of contention. You may not see this contention in production since performance is dominated by I/O and the system does not run many non-partition-aware operations.

Priority Operations

In some cases, the system needs to run operations with a higher priority, e.g., an important system operation. To support priority operations, Hazelcast has the following features:

  • For partition-aware operations: Each partition thread has its own work queue and it also has a priority work queue. The partition thread always checks the priority queue before it processes work from its normal work queue.

  • For non-partition-aware operations: Next to the genericWorkQueue, there is also a genericPriorityWorkQueue. When a priority operation needs to be run, it is put in the genericPriorityWorkQueue. Like the partition-aware operation threads, a generic operation thread first checks the genericPriorityWorkQueue for work.

Since a worker thread blocks on the normal work queue (either partition specific or generic), a priority operation may not be picked up because it is not put in the queue where it is blocking. Hazelcast always sends a 'kick the worker' operation that only triggers the worker to wake up and check the priority queue.

Operation-response and Invocation-future

When an Operation is invoked, a Future is returned. See the example code below.

GetOperation operation = new GetOperation( mapName, key );
Future future = operationService.invoke( operation );
future.get();

The calling side blocks for a reply. In this case, GetOperation is set in the work queue for the partition of key, where it eventually is executed. Upon execution, a response is returned and placed on the genericWorkQueue where it is executed by a "generic operation thread". This thread signals the future and notifies the blocked thread that a response is available. Hazelcast has a plan of exposing this future to the outside world, and we will provide the ability to register a completion listener so you can perform asynchronous calls.

Local Calls

When a local partition-aware call is done, an operation is made and handed over to the work queue of the correct partition operation thread, and a future is returned. When the calling thread calls get on that future, it acquires a lock and waits for the result to become available. When a response is calculated, the future is looked up and the waiting thread is notified.

In the future, this will be optimized to reduce the amount of expensive systems calls, such as lock.acquire()/notify() and the expensive interaction with the operation-queue. Probably, we will add support for a caller-runs mode, so that an operation is directly run on the calling thread.

SlowOperationDetector

The SlowOperationDetector monitors the operation threads and collects information about all slow operations. An Operation is a task executed by a generic or partition thread (see Operation Threading). An operation is considered as slow when it takes more computation time than the configured threshold.

The SlowOperationDetector stores the fully qualified classname of the operation and its stacktrace as well as operation details, start time and duration of each slow invocation. All collected data is available in the Management Center.

The SlowOperationDetector is configured via the following system properties.

  • hazelcast.slow.operation.detector.enabled

  • hazelcast.slow.operation.detector.log.purge.interval.seconds

  • hazelcast.slow.operation.detector.log.retention.seconds

  • hazelcast.slow.operation.detector.stacktrace.logging.enabled

  • hazelcast.slow.operation.detector.threshold.millis

See the System Properties appendix for explanations of these properties.

Logging of Slow Operations

The detected slow operations are logged as warnings in the Hazelcast log files:

WARN 2015-05-07 11:05:30,890 SlowOperationDetector: [127.0.0.1]:5701
  Slow operation detected: com.hazelcast.map.impl.operation.PutOperation
  Hint: You can enable the logging of stacktraces with the following config
  property: hazelcast.slow.operation.detector.stacktrace.logging.enabled
WARN 2015-05-07 11:05:30,891 SlowOperationDetector: [127.0.0.1]:5701
  Slow operation detected: com.hazelcast.map.impl.operation.PutOperation
  (2 invocations)
WARN 2015-05-07 11:05:30,892 SlowOperationDetector: [127.0.0.1]:5701
  Slow operation detected: com.hazelcast.map.impl.operation.PutOperation
  (3 invocations)

Stacktraces are always reported to the Management Center, but by default they are not printed to keep the log size small. If logging of stacktraces is enabled, the full stacktrace is printed every 100 invocations. All other invocations print a shortened version.

Purging of Slow Operation Logs

Since a Hazelcast cluster can run for a very long time, Hazelcast purges the slow operation logs periodically to prevent an OOME. You can configure the purge interval and the retention time for each invocation.

The purging removes each invocation whose retention time is exceeded. When all invocations are purged from a slow operation log, the log is deleted.

Caching Deserialized Values

There may be cases where you do not want to deserialize some values in your Hazelcast map again which were already deserialized previously. This way your query operations get faster. This is possible by using the cache-deserialized-values element in your declarative Hazelcast configuration, as shown below.

  • XML

  • YAML

<hazelcast>
    ...
    <map name="myMap">
        <in-memory-format>BINARY</in-memory-format>
        <cache-deserialized-values>INDEX-ONLY</cache-deserialized-values>
        <backup-count>1</backup-count>
    </map>
    ...
</hazelcast>
hazelcast:
  map:
    myMap:
      in-memory-format: BINARY
      cache-deserialized-values: INDEX-ONLY
      backup-count: 1

The cache-deserialized-values element controls the caching of deserialized values. Note that caching makes the query evaluation faster, but it consumes more memory. This element has the following values:

  • NEVER: Deserialized values are never cached.

  • INDEX-ONLY: Deserialized values are cached only when they are inserted into an index.

  • ALWAYS: Deserialized values are always cached.

If you are using Portable / Compact serialization or your map’s in-memory format is OBJECT or NATIVE, then cache-deserialized-values element does not have any effect.