Best Practices
Basic Performance Recommendations
-
8 cores per Hazelcast server instance
-
Minimum of 8 GB RAM per Hazelcast member (if not using the High-Density Memory Store)
-
Dedicated NIC per Hazelcast member
-
Linux—any distribution
-
All members should run within the same subnet
-
All members should be attached to the same network switch
Homogeneous Hardware Resources
One of the most effective optimization strategies for Hazelcast is to ensure that Hazelcast services are allocated their own dedicated machine resources. Using dedicated, properly sized hardware (or virtual hardware) ensures that Hazelcast members have ample CPU, memory, and network resources without competing with other processes or services.
Hazelcast distributes load evenly across all its members and assumes that the resources available to each of its members are homogeneous. In a cluster with a mix of more and less powerful machines, the weaker members cause bottlenecks, leaving the stronger members underutilized. For predictable performance, it is best to use equivalent hardware for all Hazelcast members.
Using Single Member per Machine
A Hazelcast member assumes it is alone on a machine, so we recommend not running multiple Hazelcast members on a machine. Having multiple members on a single machine most likely gives a worse performance compared to running a single member, since there will be more context switching, less batching, etc. So unless it is proven that running multiple members per machine does give a better performance/behavior in your particular setup, it is best to run a single member per machine.
Using Operation Threads Efficiently
By default, Hazelcast uses the machine’s core count to determine the number of operation threads. Creating more operation threads than this core count is highly unlikely leads to an improved performance since there will be more context switching, more thread notification, etc.
Especially if you have a system that does simple operations like put and get, it is better to use a lower thread count than the number of cores. The reason behind the increased performance by reducing the core count is that the operations executed on the operation threads normally execute very fast and there can be a very significant amount of overhead caused by thread parking and unparking. If there are less threads, a thread needs to do more work, will block less and therefore needs to be notified less.
Avoiding Random Changes
Tweaking can be very rewarding because significant performance improvements are possible. By default, Hazelcast tries to behave at its best for all situations, but this doesn’t always lead to the best performance. So if you know what you are doing and what to look for, it can be very rewarding to tweak. However it is also important that tweaking should be done with proper testing to see if there is actually an improvement. Tweaking without proper benchmarking is likely going to lead to confusion and could cause all kinds of problems. In case of doubt, we recommend not to tweak.
Creating the Right Benchmark Environment
When benchmarking, it is important that the benchmark reflects your production environment. Sometimes with calculated guess, a representative smaller environment can be set up; but if you want to use the benchmark statistics to inference how your production system is going to behave, you need to make sure that you get as close as your production setup as possible. Otherwise, you are at risk of spotting the issues too late or focusing on the things which are not relevant.
AWS Deployments
When you deploy Hazelcast clusters on AWS EC2 instances, you can consider to place the cluster members on the same Cluster Placement Group. This helps reducing the latency among members drastically. Additionally, you can also consider using private IPs instead of public ones to increase the throughput when the cluster members are placed in the same VPC.
For the best performance of your Hazelcast on AWS EC2:
-
Select the newest Linux AMIs.
-
Select the HVM based instances.
-
Select at least a system with 8 vCPUs, e.g., c4.2xlarge. For an overview of all types of EC2 instances, please check this web page.
-
Consider setting a placement group.
Pipelining
With the pipelining, you can send multiple
requests in parallel using a single thread and therefore can increase throughput.
As an example, suppose that the round trip time for a request/response
is 1 millisecond. If synchronous requests are used, e.g., IMap.get()
, then the maximum throughput out of these requests from
a single thread is 1/001 = 1000 operations/second. One way to solve this problem is to introduce multithreading to make
the requests in parallel. For the same example, if we would use 2 threads, then the maximum throughput doubles from 1000
operations/second, to 2000 operations/second.
However, introducing threads for the sake of executing requests isn’t always convenient and doesn’t always lead to an optimal
performance; this is where the pipelining can be used. Instead of using multiple threads to have concurrent invocations,
you can use asynchronous method calls such as IMap.getAsync()
. If you would use 2 asynchronous calls from a single thread,
then the maximum throughput is 2*(1/001) = 2000 operations/second. Therefore, to benefit from the pipelining, asynchronous calls need to
be made from a single thread. The pipelining is a convenience implementation to provide back pressure, i.e., controlling
the number of inflight operations, and it provides a convenient way to wait for all the results.
Pipelining<String> pipelining = new Pipelining<String>(10);
for (long k = 0; k < 100; k++) {
int key = random.nextInt(keyDomain);
pipelining.add(map.getAsync(key));
}
// wait for completion
List<String> results = pipelining.results();
In the above example, we make 100 asynchronous map.getAsync()
calls, but the maximum number of inflight calls is 10.
By increasing the depth of the pipelining, throughput can be increased. The pipelining has its own back pressure, you do not need to enable the back pressure on the client or member to have this feature on the pipelining. However, if you have many pipelines, you may still need to enable the client/member back pressure because it is possible to overwhelm the system with requests in that situation. See the Back Pressure section to learn how to enable it on the client or member.
You can use the pipelining both on the clients and members. You do not need a special configuration, it works out-of-the-box.
The pipelining can be used for any asynchronous call. You can use it for IMap asynchronous get/put methods as well as for ICache, IAtomicLong, etc. It cannot be used as a transaction mechanism though. So you cannot do some calls and throw away the pipeline and expect that none of the requests are executed. If you want to use an atomic behavior, see the Transactions chapter. The pipelining is just a performance optimization, not a mechanism for atomic behavior.
The pipelines are cheap and should frequently be replaced because they accumulate results. It is fine to have a few hundred or even a few thousand calls being processed with the pipelining. However, all the responses to all requests are stored in the pipeline as long as the pipeline is referenced. So if you want to process a huge number of requests, then every few hundred or few thousand calls wait for the pipelining results and just create a new instance.
Note that the pipelines are not thread-safe. They must be used by a single thread.
Back Pressure
Hazelcast uses operations to make remote calls. For example, a map.get
is an operation and
a map.put
is one operation for the primary
and one operation for each of the backups, i.e., map.put
is executed for the primary and also for each backup.
In most cases, there is a natural balance between the number of threads performing operations
and the number of operations being executed. However, the following may pile up this balance and operations
and eventually lead to OutofMemoryException
(OOME
):
-
Asynchronous calls: With async calls, the system may be flooded with the requests.
-
Asynchronous backups: The asynchronous backups may be piling up.
To prevent the system from crashing, Hazelcast provides back pressure. Back pressure works by limiting the number of concurrent operation invocations and periodically making an async backup sync.
Member Side
Back pressure is disabled by default and you can enable it using the following system property:
hazelcast.backpressure.enabled
To control the number of concurrent invocations, you can configure the number of invocations allowed per partition using the following system property:
hazelcast.backpressure.max.concurrent.invocations.per.partition
The default value of this system property is 100. Using a default configuration a system is allowed to have (271 + 1) * 100 = 27200 concurrent invocations (271 partitions + 1 for generic operations).
Back pressure is only applied to normal operations. System operations like heart beats and repartitioning operations
are not influenced by back pressure. 27200 invocations might seem like a lot, but keep in mind that executing a task on IExecutor
or acquiring a lock also requires an operation.
If the maximum number of invocations has been reached, Hazelcast automatically applies an exponential backoff policy. This
gives the system some time to deal with the load.
Using the following system property, you can configure the maximum time to wait before a HazelcastOverloadException
is thrown:
hazelcast.backpressure.backoff.timeout.millis
This system property’s default value is 60000 milliseconds.
The Health Monitor keeps an eye on the usage of the invocations. If it sees a member has consumed 70% or more of the invocations, it starts to log health messages.
Apart from controlling the number of invocations, you also need to control the number of pending async backups. This is done by periodically making these backups sync instead of async. This forces all pending backups to get drained. For this, Hazelcast tracks the number of asynchronous backups for each partition. At every Nth call, one synchronization is forced. This N is controlled through the following property:
hazelcast.backpressure.syncwindow
This system property’s default value is 100. It means, out of 100 asynchronous backups, Hazelcast makes 1 of them a synchronous one. A randomization is added, so the sync window with default configuration is between 75 and 125 invocations.
Client Side
To prevent the system on the client side from overloading, you can apply a constraint on the number of concurrent invocations. You can use the following system property on the client side for this purpose:
hazelcast.client.max.concurrent.invocations
This property defines the maximum allowed number of concurrent invocations.
When it is not explicitly set, it has the value Integer.MAX_VALUE
by default, which means infinite.
When you set it and if the maximum number of concurrent invocations is exceeded this value,
Hazelcast throws HazelcastOverloadException
when a new invocation comes in.
Please note that back off timeout and controlling the number of pending async backups (sync window) is not supported on the client side.
See the System Properties appendix to learn how to configure the system properties. |
Near Cache
Access to small-to-medium, read-mostly data sets may be sped up by creating a Near Cache. This cache maintains copies of distributed data in local memory for very fast access.
Benefits:
-
Avoids the network and deserialization costs of retrieving frequently-used data remotely
-
Eventually consistent
-
Can persist keys on a filesystem and reload them on restart. This means you can have your Near Cache ready right after application start
-
Can use deserialized objects as Near Cache keys to speed up lookups
Costs:
-
Increased memory consumption in the local JVM
-
High invalidation rates may outweigh the benefits of locality of reference
-
Strong consistency is not maintained; you may read stale data
Map or Cache entries in Hazelcast are partitioned across the cluster members.
Hazelcast clients do not have local data at all. Suppose you read the key k
a number of times from
a Hazelcast client or k
is owned by another member in your cluster.
Then each map.get(k)
or cache.get(k)
will be a remote operation, which creates a lot of network trips.
If you have a data structure that is mostly read, then you should consider creating a local Near Cache,
so that reads are sped up and less network traffic is created.
These benefits do not come for free. See the following trade-offs:
-
Members with a Near Cache has to hold the extra cached data, which increases memory consumption.
-
If invalidation is enabled and entries are updated frequently, then invalidations will be costly.
-
Near Cache breaks the strong consistency guarantees; you might be reading stale data.
Near Cache is highly recommended for data structures that are mostly read.
In a client/server system you must enable the Near Cache separately on the client, without the need to configure it on the server. Please note that Near Cache configuration is specific to the server or client itself: a data structure on a server may not have Near Cache configured while the same data structure on a client may have Near Cache configured. They also can have different Near Cache configurations.
If you are using Near Cache, you should take into account that your hits to the keys in the Near Cache are not reflected as hits to the original keys on the primary members. This has for example an impact on IMap’s maximum idle seconds or time-to-live seconds expiration. Therefore, even though there is a hit on a key in Near Cache, your original key on the primary member may expire.
Near Cache works only when you access data via map.get(k) or cache.get(k) methods.
Data returned using a predicate is not stored in the Near Cache.
|
Hazelcast Data Structures with Near Cache Support
The following matrix shows the Hazelcast data structures with Near Cache support.
Please have a look at the next section for a detailed explanation of cache-local-entries
, local-update-policy
, preloader
and serialize-keys
.
Data structure | Near Cache Support | cache-local-entries |
local-update-policy |
preloader |
serialize-keys |
---|---|---|---|---|---|
IMap member |
yes |
yes |
no |
no |
yes |
IMap client |
yes |
no |
no |
yes |
yes |
JCache member |
no |
no |
no |
no |
no |
JCache client |
yes |
no |
yes |
yes |
yes |
ReplicatedMap member |
no |
no |
no |
no |
no |
ReplicatedMap client |
yes |
no |
no |
no |
no |
TransactionalMap member |
limited |
no |
no |
no |
no |
TransactionalMap client |
no |
no |
no |
no |
no |
Even though lite members do not store any data for Hazelcast data structures, you can enable Near Cache on lite members for faster reads. |
Configuring Near Cache
The following shows the configuration for the Hazelcast Near Cache.
Please keep in mind that, if you want to use near cache on a Hazelcast member, configure it on the member; if you want to use it on a Hazelcast client, configure it on the client. |
Declarative Configuration:
<hazelcast>
...
<near-cache name="myDataStructure">
<in-memory-format>BINARY</in-memory-format>
<invalidate-on-change>true</invalidate-on-change>
<time-to-live-seconds>0</time-to-live-seconds>
<max-idle-seconds>60</max-idle-seconds>
<eviction eviction-policy="LFU"
max-size-policy= "ENTRY_COUNT"
size="1000"/>
<cache-local-entries>false</cache-local-entries>
<local-update-policy>INVALIDATE</local-update-policy>
<preloader enabled="true"
directory="nearcache-example"
store-initial-delay-seconds="0"
store-interval-seconds="0"/>
</near-cache>
...
</hazelcast>
hazelcast:
near-cache:
myDataStructure:
in-memory-format: BINARY
invalidate-on-change: true
time-to-live-seconds: 0
max-idle-seconds: 60
eviction:
size: 1000
max-size-policy: ENTRY_COUNT
eviction-policy: LFU
cache-local-entries: false
local-update-policy: INVALIDATE
preloader:
enabled: true
directory: nearcache-example
store-initial-delay-seconds: 0
store-interval-seconds: 0
The element <near-cache>
has an optional attribute name
whose default value is default
.
Programmatic Configuration:
EvictionConfig evictionConfig = new EvictionConfig()
.setMaxSizePolicy(MaxSizePolicy.ENTRY_COUNT)
.setEvictionPolicy(EvictionPolicy.LRU)
.setSize( 1 );
NearCachePreloaderConfig preloaderConfig = new NearCachePreloaderConfig()
.setEnabled(true)
.setDirectory("nearcache-example")
.setStoreInitialDelaySeconds( 1 )
.setStoreIntervalSeconds( 2 );
NearCacheConfig nearCacheConfig = new NearCacheConfig()
.setName("myDataStructure")
.setInMemoryFormat(InMemoryFormat.BINARY)
.setSerializeKeys(true)
.setInvalidateOnChange(false)
.setTimeToLiveSeconds( 1 )
.setMaxIdleSeconds( 5 )
.setEvictionConfig(evictionConfig)
.setCacheLocalEntries(true)
.setLocalUpdatePolicy(NearCacheConfig.LocalUpdatePolicy.CACHE_ON_UPDATE)
.setPreloaderConfig(preloaderConfig);
The class NearCacheConfig is used for all supported Hazelcast data structures on members and clients.
The following are the descriptions of all configuration elements and attributes:
-
in-memory-format
: Specifies in which format data is stored in your Near Cache. Note that a map’s in-memory format can be different from that of its Near Cache. Available values are as follows:-
BINARY
: Data is stored in serialized binary format (default value). -
OBJECT
: Data is stored in deserialized form. -
NATIVE
: Data is stored in the Near Cache that uses Hazelcast’s High-Density Memory Store feature. This option is available only in Hazelcast Enterprise. Note that a map and its Near Cache can independently use High-Density Memory Store. For example, while your map does not use High-Density Memory Store, its Near Cache can use it.
-
-
invalidate-on-change
: Specifies whether the cached entries are evicted when the entries are updated or removed. Its default value is true. -
time-to-live-seconds
: Maximum number of seconds for each entry to stay in the Near Cache. Entries that are older than this period are automatically evicted from the Near Cache. Regardless of the eviction policy used,time-to-live-seconds
still applies. Any integer between 0 andInteger.MAX_VALUE
. 0 means infinite. Its default value is 0. -
max-idle-seconds
: Maximum number of seconds each entry can stay in the Near Cache as untouched (not read). Entries that are not read more than this period are removed from the Near Cache. Any integer between 0 andInteger.MAX_VALUE
. 0 meansInteger.MAX_VALUE
. Its default value is 0. -
eviction
: Specifies the eviction behavior when you use High-Density Memory Store for your Near Cache. It has the following attributes:-
eviction-policy
: Eviction policy configuration. Available values are as follows:-
LRU
: Least Recently Used (default value). -
LFU
: Least Frequently Used. -
NONE
: No items are evicted and the propertymax-size
is ignored. You still can combine it withtime-to-live-seconds
andmax-idle-seconds
to evict items from the Near Cache. -
RANDOM
: A random item is evicted.
-
-
max-size-policy
: Maximum size policy for eviction of the Near Cache. Available values are as follows:-
ENTRY_COUNT
: Maximum size based on the entry count in the Near Cache (default value). -
USED_NATIVE_MEMORY_SIZE
: Maximum used native memory size of the specified Near Cache in MB to trigger the eviction. If the used native memory size exceeds this threshold, the eviction is triggered. Available only forNATIVE
in-memory format. This is supported only by Hazelcast Enterprise. -
USED_NATIVE_MEMORY_PERCENTAGE
: Maximum used native memory percentage of the specified Near Cache to trigger the eviction. If the native memory usage percentage (relative to maximum native memory size) exceeds this threshold, the eviction is triggered. Available only forNATIVE
in-memory format. This is supported only by Hazelcast Enterprise. -
FREE_NATIVE_MEMORY_SIZE
: Minimum free native memory size of the specified Near Cache in MB to trigger the eviction. If free native memory size goes below this threshold, eviction is triggered. Available only forNATIVE
in-memory format. This is supported only by Hazelcast Enterprise. -
FREE_NATIVE_MEMORY_PERCENTAGE
: Minimum free native memory percentage of the specified Near Cache to trigger eviction. If free native memory percentage (relative to maximum native memory size) goes below this threshold, eviction is triggered. Available only forNATIVE
in-memory format. This is supported only by Hazelcast Enterprise.
-
-
size
: Maximum size of the Near Cache used formax-size-policy
. When this is reached the Near Cache is evicted based on the policy defined. Any integer between1
andInteger.MAX_VALUE
. This value has different defaults, depending on the data structure.-
IMap
: Its default value isInteger.MAX_VALUE
for on-heap maps and10000
for theNATIVE
in-memory format. -
JCache
: Its default value is10000
.
-
-
-
cache-local-entries
: Specifies whether the local entries are cached. It can be useful when in-memory format for Near Cache is different from that of the map. By default, it is disabled. Is just available on Hazelcast members, not on Hazelcast clients (which have no local entries). -
local-update-policy
: Specifies the update policy of the local Near Cache. It is available on JCache clients. Available values are as follows:-
INVALIDATE
: Removes the Near Cache entry on mutation. After the mutative call to the member completes but before the operation returns to the caller, the Near Cache entry is removed. Until the mutative operation completes, the readers still continue to read the old value. But as soon as the update completes the Near Cache entry is removed. Any threads reading the key after this point will have a Near Cache miss and call through to the member, obtaining the new entry. This setting provides read-your-writes consistency. This is the default setting. -
CACHE_ON_UPDATE
: Updates the Near Cache entry on mutation. After the mutative call to the member completes but before the put returns to the caller, the Near Cache entry is updated. So a remove will remove it and one of the put methods will update it to the new value. Until the update/remove operation completes, the entry’s old value can still be read from the Near Cache. But before the call completes the Near Cache entry is updated. Any threads reading the key after this point read the new entry. If the mutative operation was a remove, the key will no longer exist in the cache, both the Near Cache and the original copy in the member. The member initiates an invalidate event to any other Near Caches, however the caller Near Cache is not invalidated as it already has the new value. This setting also provides read-your-writes consistency.
-
-
preloader
: Specifies if the Near Cache should store and pre-load its keys for a faster re-population after a Hazelcast client restart. Is just available on IMap and JCache clients. It has the following attributes:-
enabled
: Specifies whether the preloader for this Near Cache is enabled or not,true
orfalse
. -
directory
: Specifies the parent directory for the preloader of this Near Cache. The filenames for the preloader storage are generated from the Near Cache name. You can additionally specify the parent directory to have multiple clients on the same machine with the same Near Cache names. -
store-initial-delay-seconds
: Specifies the delay in seconds until the keys of this Near Cache are stored for the first time. Its default value is600
seconds. -
store-interval-seconds
: Specifies the interval in seconds in which the keys of this Near Cache are stored. Its default value is600
seconds.
-
Near Cache Configuration Examples
This section shows some configuration examples for different Hazelcast data structures.
Near Cache Example for IMap
The following are configuration examples for IMap Near Caches for Hazelcast members and clients.
<hazelcast>
...
<map name="mostlyReadMap">
<in-memory-format>BINARY</in-memory-format>
<near-cache>
<in-memory-format>OBJECT</in-memory-format>
<invalidate-on-change>false</invalidate-on-change>
<time-to-live-seconds>600</time-to-live-seconds>
<eviction eviction-policy="NONE" max-size-policy="ENTRY_COUNT" size="5000"/>
<cache-local-entries>true</cache-local-entries>
</near-cache>
</map>
...
</hazelcast>
hazelcast:
map:
mostlyReadMap:
in-memory-format: BINARY
near-cache:
in-memory-format: OBJECT
invalidate-on-change: false
time-to-live-seconds: 600
eviction:
eviction-policy: NONE
max-size-policy: ENTRY_COUNT
size: 5000
cache-local-entries: true
EvictionConfig evictionConfig = new EvictionConfig()
.setEvictionPolicy(EvictionPolicy.NONE)
.setMaximumSizePolicy(MaxSizePolicy.ENTRY_COUNT)
.setSize(5000);
NearCacheConfig nearCacheConfig = new NearCacheConfig()
.setInMemoryFormat(InMemoryFormat.OBJECT)
.setInvalidateOnChange(false)
.setTimeToLiveSeconds(600)
.setEvictionConfig(evictionConfig);
Config config = new Config();
config.getMapConfig("mostlyReadMap")
.setInMemoryFormat(InMemoryFormat.BINARY)
.setNearCacheConfig(nearCacheConfig);
The Near Cache configuration for maps on members is a child of the map configuration, so you do not have to define the map name in the Near Cache configuration.
<hazelcast-client>
...
<near-cache name="mostlyReadMap">
<in-memory-format>OBJECT</in-memory-format>
<invalidate-on-change>true</invalidate-on-change>
<eviction eviction-policy="LRU" max-size-policy="ENTRY_COUNT" size="50000"/>
</near-cache>
...
</hazelcast-client>
hazelcast-client:
near-cache:
mostlyReadMap:
in-memory-format: OBJECT
invalidate-on-change: true
eviction:
eviction-policy: LRU
max-size-policy: ENTRY_COUNT
size: 50000
EvictionConfig evictionConfig = new EvictionConfig()
.setEvictionPolicy(EvictionPolicy.LRU)
.setMaximumSizePolicy(MaxSizePolicy.ENTRY_COUNT)
.setSize(50000);
NearCacheConfig nearCacheConfig = new NearCacheConfig()
.setName("mostlyReadMap")
.setInMemoryFormat(InMemoryFormat.OBJECT)
.setInvalidateOnChange(true)
.setEvictionConfig(evictionConfig);
ClientConfig clientConfig = new ClientConfig()
.addNearCacheConfig(nearCacheConfig);
The Near Cache on the client side must have the same name as the data structure on the member for which
this Near Cache is being created. You can use wildcards, so in this example mostlyRead*
would also match the map mostlyReadMap
.
A Near Cache can have its own in-memory-format
which is independent of the in-memory-format
of the data structure.
Near Cache Example for JCache Clients
The following is a configuration example for a JCache Near Cache for a Hazelcast client.
<hazelcast-client>
...
<near-cache name="mostlyReadCache">
<in-memory-format>OBJECT</in-memory-format>
<invalidate-on-change>true</invalidate-on-change>
<eviction eviction-policy="LRU" max-size-policy="ENTRY_COUNT" size="30000"/>
<local-update-policy>CACHE_ON_UPDATE</local-update-policy>
</near-cache>
...
</hazelcast-client>
hazelcast-client:
near-cache:
mostlyReadCache:
in-memory-format: OBJECT
invalidate-on-change: true
eviction:
eviction-policy: LRU
max-size-policy: ENTRY_COUNT
size: 30000
local-update-policy: CACHE_ON_UPDATE
EvictionConfig evictionConfig = new EvictionConfig()
.setEvictionPolicy(EvictionPolicy.LRU)
.setMaximumSizePolicy(MaxSizePolicy.ENTRY_COUNT)
.setSize(30000);
NearCacheConfig nearCacheConfig = new NearCacheConfig()
.setName("mostlyReadCache")
.setInMemoryFormat(InMemoryFormat.OBJECT)
.setInvalidateOnChange(true)
.setEvictionConfig(evictionConfig)
.setLocalUpdatePolicy(LocalUpdatePolicy.CACHE_ON_UPDATE);
ClientConfig clientConfig = new ClientConfig()
.addNearCacheConfig(nearCacheConfig);
Example for Near Cache with High-Density Memory Store
Hazelcast Enterprise Feature
The following is a configuration example for an IMap High-Density Near Cache for a Hazelcast member.
<hazelcast>
...
<map name="mostlyReadMapWithHighDensityNearCache">
<in-memory-format>OBJECT</in-memory-format>
<near-cache>
<in-memory-format>NATIVE</in-memory-format>
<eviction eviction-policy="LFU" max-size-policy="USED_NATIVE_MEMORY_PERCENTAGE" size="90"/>
</near-cache>
</map>
...
</hazelcast>
hazelcast:
map:
mostlyReadMapWithHighDensityNearCache
in-memory-format: OBJECT
near-cache:
in-memory-format: NATIVE
eviction:
eviction-policy: LFU
max-size-policy: USED_NATIVE_MEMORY_PERCENTAGE
size: 90
EvictionConfig evictionConfig = new EvictionConfig()
.setEvictionPolicy(EvictionPolicy.LFU)
.setMaximumSizePolicy(MaxSizePolicy.USED_NATIVE_MEMORY_PERCENTAGE)
.setSize(90);
NearCacheConfig nearCacheConfig = new NearCacheConfig()
.setInMemoryFormat(InMemoryFormat.NATIVE)
.setEvictionConfig(evictionConfig);
Config config = new Config();
config.getMapConfig("mostlyReadMapWithHighDensityNearCache")
.setInMemoryFormat(InMemoryFormat.OBJECT)
.setNearCacheConfig(nearCacheConfig);
Keep in mind that you should have already enabled the High-Density Memory Store usage for your cluster. See the Configuring High-Density Memory Store section.
Note that a map and its Near Cache can independently use High-Density Memory Store. For example, if your map does not use High-Density Memory Store, its Near Cache can still use it.
Near Cache Eviction
In the scope of Near Cache, eviction means evicting (clearing) the entries selected according to
the given eviction-policy
when the specified max-size-policy
has been reached.
The max-size-policy
defines the state when the Near Cache is full and determines whether
the eviction should be triggered. The size
is either interpreted as entry count, memory size or percentage, depending on the chosen policy.
Once the eviction is triggered the configured eviction-policy
determines which, if any, entries must be evicted.
Note that the policies mentioned are configured under the near-cache
configuration block, as seen in the above
configuration examples.
Near Cache Expiration
Expiration means the eviction of expired records. A record is expired:
-
if it is not touched (accessed/read) for
max-idle-seconds
-
time-to-live-seconds
passed since it is put to Near Cache.
The actual expiration is performed in the following cases:
-
When a record is accessed: it is checked if the record is expired or not. If it is expired, it is evicted and
null
is returned as the value to the caller. -
In the background: there is an expiration task that periodically (currently 5 seconds) scans records and evicts the expired records.
Note that max-idle-seconds
and time-to-live-seconds
are configured under the near-cache
configuration block, as seen in the above
configuration examples.
Near Cache Invalidation
Invalidation is the process of removing an entry from the Near Cache when its value is updated or it is removed from the original data structure (to prevent stale reads). Near Cache invalidation happens asynchronously at the cluster level, but synchronously at the current member. This means that the Near Cache is invalidated within the whole cluster after the modifying operation is finished, but updated from the current member before the modifying operation is done. A modifying operation can be an EntryProcessor, an explicit update or remove as well as an expiration or eviction. Generally, whenever the state of an entry changes in the record store by updating its value or removing it, the invalidation event is sent for that entry.
Invalidations can be sent from members to client Near Caches or to member Near Caches, either individually or in batches. Default behavior is sending in batches. If there are lots of mutating operations such as put/remove on data structures, it is advised that you configure batch invalidations. This reduces the network traffic and keeps the eventing system less busy, but may increase the delay of individual invalidations.
You can use the following system properties to configure the Near Cache invalidation:
-
hazelcast.map.invalidation.batch.enabled
: Enable or disable batching. Its default value istrue
. When it is set tofalse
, all invalidations are sent immediately. -
hazelcast.map.invalidation.batch.size
: Maximum number of invalidations in a batch. Its default value is100
. -
hazelcast.map.invalidation.batchfrequency.seconds
: If the collected invalidations do not reach the configured batch size, a background process sends them periodically. Its default value is10
seconds.
If there are a lot of clients or many mutating operations, batching should remain enabled and
the batch size should be configured with the hazelcast.map.invalidation.batch.size
system property to a suitable value.
Near Cache Consistency
Eventual Consistency
Near Caches are invalidated by invalidation events. Invalidation events can be lost due to the fire-and-forget fashion of eventing system. If an event is lost, reads from Near Cache can indefinitely be stale.
To solve this problem, Hazelcast provides
eventually consistent behavior for IMap/JCache Near Caches by detecting invalidation losses.
After detection of an invalidation loss, stale data is made unreachable and Near Cache’s get
calls to
that data are directed to the underlying IMap/JCache to fetch the fresh data.
You can configure eventual consistency with the system properties below (same properties are valid for both member and client side Near Caches):
-
hazelcast.invalidation.max.tolerated.miss.count
: Default value is 10. If missed invalidation count is bigger than this value, relevant cached data is made unreachable. -
hazelcast.invalidation.reconciliation.interval.seconds
: Default value is 60 seconds. This is a periodic task that scans cluster members periodically to compare generated invalidation events with the received ones from Near Cache.
Locally Initiated Changes
For local invalidations, when a record is updated/removed, future reads will see this update/remove to provide read-your-writes consistency. To achieve this consistency, Near Cache configuration provides the following update policies:
-
INVALIDATE
-
CACHE_ON_UPDATE
If you choose INVALIDATE
, the entry is removed from the Near Cache after the update/remove occurs in
the underlying data structure and before the operation (get) returns to the caller.
Until the update/remove operation completes, the entry’s old value can still be read from the Near Cache.
If you choose CACHE_ON_UPDATE
, the entry is updated after the
update/remove occurs in the underlying data structure and before the operation (put/get) returns to the caller.
If it is an update operation, it removes the entry and the new value is placed.
Until the update/remove operation completes, the entry’s old value can still be read from the Near Cache.
Any threads reading the key after this point read the new entry. If the mutative operation was a remove,
the key will no longer exist in the Near Cache and the original copy in the member.
Near Cache Preloader
The Near Cache preloader is a functionality to store the keys from a Near Cache to provide a fast re-population of the previous hot data set after a Hazelcast Client has been restarted. It is available on IMap and JCache clients.
The Near Cache preloader stores the keys (not the values) of Near Cache entries in regular intervals.
You can define the initial delay via store-initial-delay-seconds
, e.g., if you know that your hot data set needs
some time to build up. You can configure the interval via store-interval-seconds
which determines how often
the key-set is stored. The persistence does not run continuously. Whenever the storage is scheduled, it is performed on the actual keys in the Near Cache.
The Near Cache preloader is triggered on the first initialization of the data structure on the client, e.g., client.getMap("myNearCacheMap")
.
This schedules the preloader, which works in the background, so your application is not blocked.
The storage is enabled after the loading is completed.
The configuration parameter directory
is optional.
If you omit it, the base directory becomes the user working directory (normally where the JVM was started or
configured with the system property user.dir
).
The storage filenames are always created from the Near Cache name.
So even if you use a wildcard name in the Near Cache Configuration, the preloader filenames are unique.
If you run multiple Hazelcast clients with enabled Near Cache preloader on the same machine, you have to configure a unique storage filename for each client or run them from different user directories. If two clients would write into the same file, only the first client succeeds. The following clients throw an exception as soon as the Near Cache preloader is triggered. |
Data Affinity
Data affinity ensures that related entries exist on the same member. If related data is on the same member, operations can be executed without the cost of extra network calls and extra wire data. This feature is provided by using the same partition keys for related data.
PartitionAware
Co-location of related data and computation
Hazelcast has a standard way of finding out which member owns/manages each key object.
The following operations are routed to the same member, since all them are operating based on the same key "key1"
.
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
Map mapA = hazelcastInstance.getMap( "mapA" );
Map mapB = hazelcastInstance.getMap( "mapB" );
Map mapC = hazelcastInstance.getMap( "mapC" );
// since map names are different, operation will be manipulating
// different entries, but the operation will take place on the
// same member since the keys ("key1") are the same
mapA.put( "key1", value );
mapB.get( "key1" );
mapC.remove( "key1" );
// lock operation will still execute on the same member
// of the cluster since the key ("key1") is same
hazelcastInstance.getLock( "key1" ).lock();
// distributed execution will execute the 'runnable' on the
// same member since "key1" is passed as the key.
hazelcastInstance.getExecutorService().executeOnKeyOwner( runnable, "key1" );
When the keys are the same, entries are stored on the same member.
But we sometimes want to have related entries stored on the same member, such as a customer and his/her order entries.
We would have a customers map with customerId as the key and an orders map with orderId as the key.
Since customerId and orderId are different keys, a customer and
his/her orders may fall into different members in your cluster. So how can we have them stored on the same member?
We create an affinity between customer and orders. If we make them part of the same partition then
these entries will be co-located. We achieve this by making orderKey
s PartitionAware
.
final class OrderKey implements PartitionAware, Serializable {
private final long orderId;
private final long customerId;
OrderKey(long orderId, long customerId) {
this.orderId = orderId;
this.customerId = customerId;
}
@Override
public Object getPartitionKey() {
return customerId;
}
@Override
public String toString() {
return "OrderKey{"
+ "orderId=" + orderId
+ ", customerId=" + customerId
+ '}';
Notice that OrderKey implements PartitionAware
and that getPartitionKey()
returns the customerId
.
These make sure that the Customer
entry and its Order
s are stored on the same member.
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
Map mapCustomers = hazelcastInstance.getMap( "customers" );
Map mapOrders = hazelcastInstance.getMap( "orders" );
// create the customer entry with customer id = 1
mapCustomers.put( 1, customer );
// now create the orders for this customer
mapOrders.put( new OrderKey( 21, 1 ), order );
mapOrders.put( new OrderKey( 22, 1 ), order );
mapOrders.put( new OrderKey( 23, 1 ), order );
Assume that you have a customers map where customerId
is the key and the customer object is the value.
You want to remove one of the customer orders and return the number of remaining orders.
Here is how you would normally do it.
public static int removeOrder( long customerId, long orderId ) throws Exception {
IMap<Long, Customer> mapCustomers = instance.getMap( "customers" );
IMap mapOrders = hazelcastInstance.getMap( "orders" );
mapCustomers.lock( customerId );
mapOrders.remove( new OrderKey(orderId, customerId) );
Set orders = orderMap.keySet(Predicates.equal( "customerId", customerId ));
mapCustomers.unlock( customerId );
return orders.size();
}
There are couple of things you should consider.
-
There are four distributed operations there: lock, remove, keySet, unlock. Can you reduce the number of distributed operations?
-
The customer object may not be that big, but can you not have to pass that object through the wire? Think about a scenario where you set order count to the customer object for fast access, so you should do a get and a put, and as a result, the customer object is passed through the wire twice.
Instead, why not move the computation over to the member (JVM) where your customer data resides. Here is how you can do this with distributed executor service.
-
Send a
PartitionAware
Callable
task. -
Callable
does the deletion of the order right there and returns with the remaining order count. -
Upon completion of the
Callable
task, return the result (remaining order count). You do not have to wait until the task is completed; since distributed executions are asynchronous, you can do other things in the meantime.
Here is an example code.
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
public int removeOrder(long customerId, long orderId) throws Exception {
IExecutorService executorService = hazelcastInstance.getExecutorService("ExecutorService");
OrderDeletionTask task = new OrderDeletionTask(customerId, orderId);
Future<Integer> future = executorService.submit(task);
int remainingOrders = future.get();
return remainingOrders;
}
public static class OrderDeletionTask
implements Callable<Integer>, PartitionAware, Serializable, HazelcastInstanceAware {
private long orderId;
private long customerId;
private HazelcastInstance hazelcastInstance;
public OrderDeletionTask() {
}
public OrderDeletionTask(long customerId, long orderId) {
this.customerId = customerId;
this.orderId = orderId;
}
@Override
public Integer call() {
IMap<Long, Customer> customerMap = hazelcastInstance.getMap("customers");
IMap<OrderKey, Order> orderMap = hazelcastInstance.getMap("orders");
customerMap.lock(customerId);
Predicate predicate = Predicates.equal("customerId", customerId);
Set<OrderKey> orderKeys = orderMap.localKeySet(predicate);
int orderCount = orderKeys.size();
for (OrderKey key : orderKeys) {
if (key.orderId == orderId) {
orderCount--;
orderMap.delete(key);
}
}
customerMap.unlock(customerId);
return orderCount;
}
@Override
public Object getPartitionKey() {
return customerId;
}
@Override
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}
}
The following are the benefits of doing the same operation with distributed ExecutorService
based on the key:
-
only one distributed execution (
executorService.submit(task)
), instead of four -
less data is sent over the wire
-
less lock duration, i.e., higher concurrency, for the
Customer
entry since lock/update/unlock cycle is done locally (local to the customer data)
PartitioningStrategy
Another way of storing the related data on the same location is using/implementing
the class PartitioningStrategy
. Normally (if no partitioning strategy is defined),
Hazelcast finds the partition of a key first by converting the object to binary and then by hashing this binary.
If a partitioning strategy is defined, Hazelcast injects the key to the strategy and
the strategy returns an object out of which the partition is calculated by hashing it.
Hazelcast offers the following out-of-the-box partitioning strategies:
-
DefaultPartitioningStrategy
: Default strategy. It checks whether the key implementsPartitionAware
. If it implements, the object is converted to binary and then hashed, to find the partition of the key. -
StringPartitioningStrategy
: Works only for string keys. It uses the string after@
character as the partition ID. For example, if you have two keysordergroup1@region1
andcustomergroup1@region1
, bothordergroup1
andcustomergroup1
fall into the partition whereregion1
is located. -
StringAndPartitionAwarePartitioningStrategy
: Works as the combination of the above two strategies. If the key implementsPartitionAware
, it works like theDefaultPartitioningStrategy
. If it is a string key, it works like theStringPartitioningStrategy
.
Following are the example configuration snippets. Note that these strategy configurations are per map.
Declarative Configuration:
<hazelcast>
...
<map name="name-of-the-map">
<partition-strategy>
com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy
</partition-strategy>
</map>
...
</hazelcast>
hazelcast:
map:
name-of-the-map:
partition-strategy: com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy
Programmatic Configuration:
Config config = new Config();
MapConfig mapConfig = config.getMapConfig("name-of-the-map");
PartitioningStrategyConfig psConfig = mapConfig.getPartitioningStrategyConfig();
psConfig.setPartitioningStrategyClass( "com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy" );
// OR
psConfig.setPartitioningStrategy(YourCustomPartitioningStrategy);
...
You can also define your own partition strategy by implementing the class PartitioningStrategy
.
To enable your implementation, add the full class name to your Hazelcast configuration using either
the declarative or programmatic approach, as exemplified above.
The examples above show how to define a partitioning strategy per map. Note that all the members of your cluster must have the same partitioning strategy configurations.
You can also change a global strategy which is applied to all the data structures in your cluster.
This can be done by defining the hazelcast.partitioning.strategy.class
system property.
An example declarative way of configuring this property is shown below:
<hazelcast>
...
<properties>
<property name="hazelcast.partitioning.strategy.class">
com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy
</property>
</properties>
...
</hazelcast>
hazelcast:
properties:
hazelcast.partitioning.strategy.class: com.hazelcast.partition.strategy.StringAndPartitionAwarePartitioningStrategy
You can specify the aforementioned out-of-the-box strategies or your custom partitioning strategy.
You can also use other system property configuring options as explained in the Configuring with System Properties section.
The per map and global (cluster) partitioning strategies are supported on the member side. Hazelcast Java clients only support the global strategy and it is configured via the same system property used in the members (`hazelcast.partitioning.strategy.class `).
CPU Thread Affinity
Hazelcast offers configuring CPU threads so that you have a lot better control on the latency and a better throughput. This configuration provides you with the CPU thread affinity, where certain threads can have affinity for particular CPUs.
The following affinity configurations are available for a member:
-Dhazelcast.io.input.thread.affinity=1-3
-Dhazelcast.io.output.thread.affinity=3-5
-Dhazelcast.operation.thread.affinity=7-10,13
-Dhazelcast.operation.response.thread.affinity=15,16
The following affinity configurations are available for a client:
-Dhazelcast.client.io.input.thread.affinity=1-4
-Dhazelcast.client.io.output.thread.affinity=5-8
-Dhazelcast.client.response.thread.affinity=7-9
You can set the CPU thread affinity properties shown above only on the command line.
Let’s have a look at how we define the values for the above configuration properties:
-
Individual CPUs, e.g.,
1,2,3
: This means there are going to be three threads. The first thread runs on CPU 1, the second thread on CPU 2, and so on. -
CPU ranges, e.g.,
1-3
: Shortcut syntax for1,2,3
. -
Group, e.g.,
[1-3]
: This configures three threads and each of these threads can run on CPU 1, 2 and 3. -
Group with thread count, e.g.,
[1-3]:2
: This configures two threads and each of these two threads can run on CPU 1, 2 and 3.
You can also combine those, e.g., 1,2,[5-7],[10,12,16]:2
.
Note that, the syntax for CPU thread affinity shown above not only determines
the mapping of CPUs to threads, it also determines the thread count.
If you use CPU thread affinity, e.g., hazelcast.io.input.thread.affinity
,
then hazelcast.io.input.thread.count
is ignored. See Threading Model for more
information about specifying explicit thread counts.
If you don’t configure affinity for a category of threads, it means they can run on any CPU.
Let’s look at an example. Assuming you have the numactl
utility, run
the following command on your machine to see the mapping between the NUMA
nodes and threads:
numactl --hardware
An example output is shown below:
available: 2 nodes (0-1)
node 0 cpus: 0 1 2 3 4 5 6 7 8 9 20 21 22 23 24 25 26 27 28 29
node 0 size: 393090 MB
node 0 free: 372729 MB
node 1 cpus: 10 11 12 13 14 15 16 17 18 19 30 31 32 33 34 35 36 37 38 39
node 1 size: 393216 MB
node 1 free: 343296 MB
node distances:
node 0 1
0: 10 21
1: 21 10
If you want to configure 20 threads on NUMA node 0 and 20 threads on NUMA node 1, and confine the threads to these NUMA nodes, you can use the following configuration:
-Dhazelcast.operation.thread.affinity=[0-9,20-29],[10-19,30-39]
See here for information about NUMA nodes.
Threading Model
Your application server has its own threads. Hazelcast does not use these; it manages its own threads.
I/O Threading
Hazelcast uses a pool of threads for I/O. A single thread does not perform all the I/O. Instead, multiple threads perform the I/O. On each cluster member, the I/O threading is split up in 3 types of I/O threads:
-
I/O thread for the accept requests
-
I/O threads to read data from other members/clients
-
I/O threads to write data to other members/clients
You can configure the number of I/O threads using the hazelcast.io.thread.count
system property.
Its default value is 3 per member. If 3 is used, in total there are 7 I/O threads:
1 accept I/O thread, 3 read I/O threads and 3 write I/O threads. Each I/O thread has
its own Selector instance and waits on the Selector.select
if there is nothing to do.
You can also specify counts for input and output threads separately.
There are hazelcast.io.input.thread.count and hazelcast.io.output.thread.count properties for this purpose.
See the System Properties appendix for information about these properties and how to set them.
|
Hazelcast periodically scans utilization of each I/O thread and
can decide to migrate a connection to a new thread if
the existing thread is servicing a disproportionate number of I/O events.
You can customize the scanning interval by configuring the hazelcast.io.balancer.interval.seconds
system property;
its default interval is 20 seconds. You can disable the balancing process by setting this property to a negative value.
In case of the read I/O thread, when sufficient bytes for a packet have been received, the Packet
object is created. This Packet
object is
then sent to the system where it is de-multiplexed. If the Packet
header signals that it is an operation/response, the Packet
is handed
over to the operation service (see the Operation Threading section). If the Packet
is an event, it is handed
over to the event service (see the Event Threading section).
Event Threading
Hazelcast uses a shared event system to deal with components that rely on events, such as topic, collections, listeners and Near Cache.
Each cluster member has an array of event threads and each thread has its own work queue. When an event is produced, either locally or remotely, an event thread is selected (depending on if there is a message ordering) and the event is placed in the work queue for that event thread.
You can set the following properties to alter the system’s behavior:
-
hazelcast.event.thread.count
: Number of event-threads in this array. Its default value is 5. -
hazelcast.event.queue.capacity
: Capacity of the work queue. Its default value is 1000000. -
hazelcast.event.queue.timeout.millis
: Timeout for placing an item on the work queue in milliseconds. Its default value is 250 milliseconds.
If you process a lot of events and have many cores, changing the value of hazelcast.event.thread.count
property to
a higher value is a good practice. This way, more events can be processed in parallel.
Multiple components share the same event queues. If there are 2 topics, say A and B, for certain messages they may share the same queue(s) and hence the same event thread. If there are a lot of pending messages produced by A, then B needs to wait. Also, when processing a message from A takes a lot of time and the event thread is used for that, B suffers from this. That is why it is better to offload processing to a dedicated thread (pool) so that systems are better isolated.
If the events are produced at a higher rate than they are consumed, the queue grows in size. To prevent overloading the system
and running into an OutOfMemoryException
, the queue is given a capacity of 1 million items. When the maximum capacity is reached, the items are
dropped. This means that the event system is a 'best effort' system. There is no guarantee that you are going to get an
event. Topic A might have a lot of pending messages and therefore B cannot receive messages because the queue
has no capacity and messages for B are dropped.
IExecutor Threading
Executor threading is straight forward. When a task is received to be executed on Executor E, then E will have its
own ThreadPoolExecutor
instance and the work is placed in the work queue of this executor.
Thus, Executors are fully isolated, but still share the same underlying hardware - most importantly the CPUs.
You can configure the IExecutor using the ExecutorConfig
(programmatic configuration) or
using <executor>
(declarative configuration). See also the Configuring Executor Service section.
Operation Threading
The following are the operation types:
-
operations that are aware of a certain partition, e.g.,
IMap.get(key)
-
operations that are not partition aware, e.g.,
IExecutorService.executeOnMember(command, member)
Each of these operation types has a different threading model explained in the following sections.
Partition-aware Operations
To execute partition-aware operations, an array of operation threads is created.
The default value of this array’s size is the number of cores and it has a minimum value of 2.
This value can be changed using the hazelcast.operation.thread.count
property.
Each operation thread has its own work queue and it consumes messages from this work queue. If a partition-aware operation needs to be scheduled, the right thread is found using the formula below.
threadIndex = partitionId % partition thread-count
After the threadIndex
is determined, the operation is put in the work queue of that operation thread. This means the followings:
-
A single operation thread executes operations for multiple partitions; if there are 271 partitions and 10 partition threads, then roughly every operation thread executes operations for 27 partitions.
-
Each partition belongs to only 1 operation thread. All operations for a partition are always handled by exactly the same operation thread.
-
Concurrency control is not needed to deal with partition-aware operations because once a partition-aware operation is put in the work queue of a partition-aware operation thread, only 1 thread is able to touch that partition.
Because of this threading strategy, there are two forms of false sharing you need to be aware of:
-
False sharing of the partition - two completely independent data structures share the same partition. For example, if there is a map
employees
and a maporders
, the methodemployees.get("peter")
running on partition 25 may be blocked by the methodorders.get(1234)
also running on partition 25. If independent data structures share the same partition, a slow operation on one data structure can slow down the other data structures. -
False sharing of the partition-aware operation thread - each operation thread is responsible for executing operations on a number of partitions. For example, thread 1 could be responsible for partitions 0, 10, 20, etc. and thread-2 could be responsible for partitions 1, 11, 21, etc. If an operation for partition 1 takes a lot of time, it blocks the execution of an operation for partition 11 because both of them are mapped to the same operation thread.
You need to be careful with long running operations because you could starve operations of a thread.
As a general rule, the partition thread should be released as soon as possible because operations are not designed
as long running operations. That is why, for example, it is very dangerous to execute a long running operation
using AtomicReference.alter()
or an IMap.executeOnKey()
, because these operations block other operations to be executed.
Currently, there is no support for work stealing. Different partitions that map to the same thread may need to wait till one of the partitions is finished, even though there are other free partition-aware operation threads available.
Example:
Take a cluster with three members. Two members have 90 primary partitions and one member has 91 primary partitions. Let’s say you have one CPU and four cores per CPU. By default, four operation threads will be allocated to serve 90 or 91 partitions.
Non-Partition-aware Operations
To execute operations that are not partition-aware, e.g., IExecutorService.executeOnMember(command, member)
, generic operation
threads are used. When the Hazelcast instance is started, an array of operation threads is created. The size of this array
has a default value of the number of cores divided by two with a minimum value of 2. It can be changed using the
hazelcast.operation.generic.thread.count
property.
A non-partition-aware operation thread does not execute an operation for a specific partition. Only partition-aware operation threads execute partition-aware operations.
Unlike the partition-aware operation threads, all the generic operation threads share the same work queue: genericWorkQueue
.
If a non-partition-aware operation needs to be executed, it is placed in that work queue and any generic operation thread can execute it. The big advantage is that you automatically have work balancing since any generic operation thread is allowed to pick up work from this queue.
The disadvantage is that this shared queue can be a point of contention. You may not see this contention in production since performance is dominated by I/O and the system does not run many non-partition-aware operations.
Priority Operations
In some cases, the system needs to run operations with a higher priority, e.g., an important system operation. To support priority operations, Hazelcast has the following features:
-
For partition-aware operations: Each partition thread has its own work queue and it also has a priority work queue. The partition thread always checks the priority queue before it processes work from its normal work queue.
-
For non-partition-aware operations: Next to the
genericWorkQueue
, there is also agenericPriorityWorkQueue
. When a priority operation needs to be run, it is put in thegenericPriorityWorkQueue
. Like the partition-aware operation threads, a generic operation thread first checks thegenericPriorityWorkQueue
for work.
Since a worker thread blocks on the normal work queue (either partition specific or generic), a priority operation may not be picked up because it is not put in the queue where it is blocking. Hazelcast always sends a 'kick the worker' operation that only triggers the worker to wake up and check the priority queue.
Operation-response and Invocation-future
When an Operation is invoked, a Future
is returned. See the example code below.
GetOperation operation = new GetOperation( mapName, key );
Future future = operationService.invoke( operation );
future.get();
The calling side blocks for a reply. In this case, GetOperation
is set in the work queue for the partition of key
, where
it eventually is executed. Upon execution, a response is returned and placed on the genericWorkQueue
where it is executed by a
"generic operation thread". This thread signals the future
and notifies the blocked thread that a response is available.
Hazelcast has a plan of exposing this future
to the outside world, and we will provide the ability to register a completion listener so you can perform asynchronous calls.
Local Calls
When a local partition-aware call is done, an operation is made and handed over to the work queue of the correct partition operation thread,
and a future
is returned. When the calling thread calls get
on that future
, it acquires a lock and waits for the result
to become available. When a response is calculated, the future
is looked up and the waiting thread is notified.
In the future, this will be optimized to reduce the amount of expensive systems calls, such as lock.acquire()
/notify()
and the expensive
interaction with the operation-queue. Probably, we will add support for a caller-runs mode, so that an operation is directly run on
the calling thread.
SlowOperationDetector
The SlowOperationDetector
monitors the operation threads and collects information about all slow operations.
An Operation
is a task executed by a generic or partition thread (see Operation Threading).
An operation is considered as slow when it takes more computation time than the configured threshold.
The SlowOperationDetector
stores the fully qualified classname of the operation and its stacktrace as well as
operation details, start time and duration of each slow invocation. All collected data is available in
the Management Center.
The SlowOperationDetector
is configured via the following system properties.
-
hazelcast.slow.operation.detector.enabled
-
hazelcast.slow.operation.detector.log.purge.interval.seconds
-
hazelcast.slow.operation.detector.log.retention.seconds
-
hazelcast.slow.operation.detector.stacktrace.logging.enabled
-
hazelcast.slow.operation.detector.threshold.millis
See the System Properties appendix for explanations of these properties.
Logging of Slow Operations
The detected slow operations are logged as warnings in the Hazelcast log files:
WARN 2015-05-07 11:05:30,890 SlowOperationDetector: [127.0.0.1]:5701
Slow operation detected: com.hazelcast.map.impl.operation.PutOperation
Hint: You can enable the logging of stacktraces with the following config
property: hazelcast.slow.operation.detector.stacktrace.logging.enabled
WARN 2015-05-07 11:05:30,891 SlowOperationDetector: [127.0.0.1]:5701
Slow operation detected: com.hazelcast.map.impl.operation.PutOperation
(2 invocations)
WARN 2015-05-07 11:05:30,892 SlowOperationDetector: [127.0.0.1]:5701
Slow operation detected: com.hazelcast.map.impl.operation.PutOperation
(3 invocations)
Stacktraces are always reported to the Management Center, but by default they are not printed to keep the log size small. If logging of stacktraces is enabled, the full stacktrace is printed every 100 invocations. All other invocations print a shortened version.
Purging of Slow Operation Logs
Since a Hazelcast cluster can run for a very long time, Hazelcast purges the slow operation logs periodically to prevent an OOME. You can configure the purge interval and the retention time for each invocation.
The purging removes each invocation whose retention time is exceeded. When all invocations are purged from a slow operation log, the log is deleted.
Caching Deserialized Values
There may be cases where you do not want to deserialize some values in your Hazelcast map again which were already deserialized previously.
This way your query operations get faster.
This is possible by using the cache-deserialized-values
element in your declarative Hazelcast configuration, as shown below.
<hazelcast>
...
<map name="myMap">
<in-memory-format>BINARY</in-memory-format>
<cache-deserialized-values>INDEX-ONLY</cache-deserialized-values>
<backup-count>1</backup-count>
</map>
...
</hazelcast>
hazelcast:
map:
myMap:
in-memory-format: BINARY
cache-deserialized-values: INDEX-ONLY
backup-count: 1
The cache-deserialized-values
element controls the caching of deserialized values.
Note that caching makes the query evaluation faster, but it consumes more memory. This element has the following values:
-
NEVER: Deserialized values are never cached.
-
INDEX-ONLY: Deserialized values are cached only when they are inserted into an index.
-
ALWAYS: Deserialized values are always cached.