Capacity Planning

Capacity planning involves estimating and validating the capacity of a cluster in order to determine which recommendations and best practices can help that cluster achieve its best reasonable performance. These estimations should also include spare capacity to allow growth for future demands and create insurance against possible failures. Planning also ensures that any limits which are determined are also enforced during implementation.

Cluster Sizing

A Hazelcast cluster is a network of members that run Hazelcast software. The cluster performance depends on multiple factors, including data size, number of backups, queries, and which features are used. Therefore, planning the cluster remains a complex task that requires knowledge of Hazelcast’s architecture and concepts. Here, we introduce some basic guidelines that help to properly size a cluster.

We recommend always benchmarking your setup before deploying it to production. We also recommend that benchmarking systems resemble the production system as much as possible to avoid unexpected results. We provide a bechmarking example that you can use as a starting point.

Hazelcast clusters will run both data processing and data storage workloads, so planning for both types of workload is important.

In order to correctly size the cluster for your use case, answers to as many of the following questions as possible are necessary:

  • How much data you want to keep in the in-memory store at any given time?

  • How many copies (backups) of that data do you require?

  • Are you going to use synchronous or asynchronous backups?

  • When running queries how many indexes or index fields for each object will you have?

  • What is your read/write ratio? (Example: 70% of time is spent reading data, 30% is spent writing)

    • Based on the read/write ratio and Transactions Per Second (TPS), you can learn about the amount of memory required to accommodate the data, both existing and new. Usually an eviction mechanism keeps the map/cache size in check, but the eviction itself does not always clear the memory almost immediately. Therefore, the answers to this question gives a good insight.

  • Are you using multiple clusters (which may involve WAN Replication)?

  • What are the throughput and latency requirements?

    • The answer should be about Hazelcast access, not the application throughput. For example, an application may plan for 10,000 user TPS but each user transaction may need to use Hazelcast 3 times during the execution. So the actual Hazelcast throughput would need to be 30,000 TPS. Similarly for latency, the answer should not be about end-to-end latency but the application-to-Hazelcast latency.

  • How many concurrent Hazelcast jobs will the cluster run?

  • What is the approximation duration of a job?

  • When you use stream processing, what is the average approximation latency for processing of a single event?

  • What is the intended sink for your jobs (database, dashboard, file system, etc.)?

    • If the sink is a Hazelcast map, then the standard caching questions apply, i.e., the maximum number of results stored in the map at any given time, eviction, etc.

  • What is the shape of the pipelines (operations used, external systems involved)?

  • What are the capacities of your Jet sources and sinks?

Once you know the size, access patterns, throughput, latency, and fault tolerance requirements of your application, you can use the following guidelines to help you determine the size of your cluster. Also, if using WAN Replication, the WAN Replication queue sizes need to be taken into consideration for sizing.

Even a single Hazelcast instance running on a recommended server can host hundreds of jobs at a time. A clustered setup improves the performance (throughput and latency) of hosted jobs and increases resilience.

To make fault tolerance possible, your cluster must have at least three members. Generally, you need n+1 cluster members to tolerate n simultaneous member failures.

If you use the Jet stream processing engine, it can use hundreds of CPU cores efficiently by exploiting data and task parallelism. When you add more members to the cluster, the capacity for CPU-bound computation rises. You can achieve better performance by distributing the data partitions across the cluster to process them in parallel.

Benchmark your jobs in a clustered setup to see the differences in performance, as in the sizing example.

Memory Considerations

Once you know the size of your working set of data, you can start sizing your memory requirements.

When speaking of "data" in Hazelcast, this includes both active data and backup data for high availability. The total memory footprint will be the size of your active data plus the size of your backup data and a small memory overhead of Hazelcast internals.

If your fault tolerance allows for just a single backup, then each member of the Hazelcast cluster will contain a 1:1 ratio of active data to backup data for a total memory footprint of two times the active data.

If your fault tolerance requires two backups, then that ratio climbs to 1:2 active to backup data for a total memory footprint of three times your active data set.

If you use only heap memory, each Hazelcast member with a 4 GB heap should accommodate a maximum of 3.5 GB of total data (active and backup). If you use the High-Density Data Store, up to 75% of your physical memory footprint may be used for active and backup data, with headroom of 25% for normal fragmentation. In both cases, however, the best practice is to keep some memory headroom available to handle any member failure or explicit member shutdown. When a member leaves the cluster, the data previously owned by the newly offline member will be redistributed across the remaining members. For this reason, we recommend that you plan to use only 60% of available memory, with 40% headroom to handle member failure or shutdown.

If you use High-Density Memory Store, Hazelcast automatically assigns a percentage of available off-heap memory to the internal memory manager. Since allocation happens lazily, if you want to be informed about how much off-heap memory is being used by the memory manager, you can set the metadata-space-percentage property to a percentage value and Hazelcast will log warnings when that value is reached. By default, this property is set to 12.5%.

If you’re using jobs, all operational data must fit within available memory. This design leads to a predictable performance but requires enough RAM not to run out. Estimate the memory requirements and plan with a headroom of 25% for normal memory fragmentation. For fault-tolerant operations, we recommend reserving extra memory to survive any failures. See sizing for failures.

If your computation is memory-bound, consider:

  • Moving data out of the Hazelcast cluster, e.g., don’t use the distributed data structures of the Hazelcast cluster and use the remote Hazelcast cluster instead.

  • Scaling out, e.g., adding more members to the cluster.

Memory consumption is affected by:

  • Resources deployed with your job: Attaching big files such as models for ML inference pipelines can consume significant resources.

  • State of the running jobs: This varies, as it’s affected by the shape of your pipeline and by the data being processed. Most of the memory is consumed by operations that aggregate and buffer data. Typically, the state also scales with the number of distinct keys seen within the time window. Learn how the operations in the pipeline store its state. Operators coming with Jet provide this information in the javadoc.

  • State back-up: For jobs configured as fault-tolerant, the state of the running jobs is regularly snapshotted and saved in the cluster. The cluster keeps two consecutive snapshots at a time (an old one is kept until the new one is successfully created). Both current and previous snapshots can be saved in multiple replicas to increase data safety. The memory required for state back-up can be calculated as (Snapshot size * 2 * Number of replicas) / Cluster member count. The snapshot size is displayed in the Management Center. You might want to keep some state snapshots residing in the cluster as points of recovery, so plan the memory requirements accordingly.

  • Data stored inside Hazelcast cluster: Any data hosted in the cluster. Notably the IMap and ICache Journal to store the streaming data.

Hazelcast offers lite members to prevent memory usage on these members. Lite members do not own any partitions, but they can access partitions that are owned by other members in the cluster. If there is no specific advantage to using non-homogeneous cluster members, we do not recommend using lite members as they increase network calls and thus increase the latency. See Enabling Lite Members for information about lite members.

Partition Count

Hazelcast’s default partition count is 271. This is a good choice for clusters of up to 50 members and ~25–30 GB of data. Up to this threshold, partitions are small enough that any rebalancing of the partition map when members join or leave the cluster doesn’t disturb the smooth operation of the cluster. With larger clusters and/or bigger data sets, a larger partition count helps to maintain an efficient rebalancing of data across members.

An optimum partition size is between 50MB – 100MB. Therefore, when designing the cluster, determine the size of the data that will be distributed across all members, and then determine the number of partitions such that no partition size exceeds 100MB. If the default count of 271 results in heavily loaded partitions, increase the partition count to the point where data load per-partition is under 100MB. Remember to factor in headroom for projected data growth.

To change the partition count, use the system property hazelcast.partition.count.

If you change the partition count from the default of 271, be sure to use a prime number of partitions. This helps minimize the collision of keys across partitions, ensuring more consistent lookup times.
If you are an Enterprise Edition customer using the High-Density Memory Store with large data sizes, we recommend a large increase in partition count, starting with 5009 or higher.
The partition count cannot be easily changed after a cluster is created, so if you have a large cluster be sure to test and set an optimum partition count prior to deployment. If you need to change th partition count after a cluster is already running, you will need to schedule a maintenance window to entirely bring the cluster down. If your cluster uses the Persistence or CP Persistence features, those persistent files will need to be removed after the cluster is shut down, as they contain references to the previous partition count. Once all member configurations are updated, and any persistent data structure files are removed, the cluster can be safely restarted.

Parallel Migrations

Hazelcast can migrate separate partitions in parallel, which significantly reduces the time needed for repartitioning. Having parallel migrations does have an impact on the heap memory and network utilization. The following properties control the number of parallel migrations/replications on a single member:

  • hazelcast.partition.max.parallel.migrations - Maximum number of partition migrations to be executed concurrently on a member.

  • hazelcast.partition.max.parallel.replications - Maximum number of parallel partition backup replication operations per member.

The default value is 10 for both properties, and this should be fine for most setups. Care and consideration should be taken before altering either of these properties. Decreasing them will make the total migration time go up, and increasing them will create additional heap and network pressure.

Scaling Maximums

Hazelcast performs scaling tests for each version of the software. Based on this testing we specify some scaling maximums. These are defined for each version of the software. We recommend staying below these numbers. Please contact Hazelcast if you plan to use higher limits.

  • Maximum 100 clients using the ALL_MEMBERS cluster routing mode per member

  • Maximum 1,000 clients using the SINGLE_MEMBER cluster routing mode per member

  • Maximum of 200GB High-Density Memory Store per member

Clients that use the ALL_MEMBERS cluster routing mode maintain a connection to each member. Clients that use the SINGLE_MEMBER cluster routing mode have a single connection to the entire cluster. You can also choose to connect to a partition group which provides direct connections to members in that group with those members acting as a gateway to the other members in the cluster. You can find more information about the cluster routing modes here: Java Cluster Routing Modes.

Size for Failures

Hazelcast clusters are elastic to deal with failures and performance spikes.

When a cluster member fails, this reduces available resources and increases stress on the remaining members until recovery. The data previously owned by the failed member gets distributed among the surviving ones. The cluster must catch up with the data that has accumulated while it was adapting to the new size, and it must keep up with the head of the stream without the CPU capacity of the lost member.

To tolerate the failure of one member, we recommend sizing your cluster so it can operate well with n-1 members.

Another approach to improve fault-tolerance is to separate the concerns of data storage and computation into two separate clusters. As an example, you could use one cluster for Hazelcast IMaps and their event journals and another one for running Hazelcast jobs. This way a single failure doesn’t simultaneously hurt both the storage and the computation capacity.

Start Independent Clusters for Job Performance Isolation

The jobs running in one cluster share the resources to maximize hardware utilization. This is efficient for setups without the risk of noisy neighbors such as:

  • Clusters hosting many short-living jobs

  • Clusters hosting jobs with a predictable performance

  • Jobs with relaxed SLAs

For stronger resource isolation (multi-tenant environments, strict SLAs), consider starting multiple smaller clusters with resources allocated at the OS level or using a resource manager such as Kubernetes.

Data Flow

Consider the capacity of data sources and sinks when planning the Hazelcast cluster.

Each Hazelcast job participates in a larger data pipeline: it continuously reads the data from the sources and writes the results to the sinks. The capacity of all components of the data pipeline must be balanced to avoid bottlenecks.

If a data sink is slow, Hazelcast applies backpressure all the way to the source, slowing down data consumption. The data sources should be designed to participate by reducing the pace of data production or by buffering the data.

On the other hand, if the data source is slow, i.e., it can’t produce or transmit the data fast enough, adding more resources to the Hazelcast cluster won’t bring any performance benefits.

Processed Data

Test your setup on a dataset that represents the characteristics of the production data, notably:

  • Partitioning of the input data

  • Key distribution and count

Hazelcast splits the data across the cluster to process it in parallel. It is designed to perform well under the assumption of balanced partitions. Imbalanced partitions may create a "hot spot" (a segment of data accessed far more often than other data) in your cluster. Factors that affect partitioning are the data source and the grouping keys used in the Hazelcast application.

A frequent source of a partition imbalance are special cases. For example, in a payment processing application, there might be a small number of accounts with very high activity. Imagine a retail company account with thousands of payments per minute vs. personal accounts with just a few payments in a day. Using account as a grouping key leads to imbalanced partitions. Consider such special cases when designing your pipelines and the test datasets.

Benchmarking and Sizing Examples

See the following caching and streaming use cases to see sample benchmarking and sizing exercises.

Caching Use Case

Consider an application that uses Hazelcast as a data cache. The active memory footprint will be the total number of objects in the cache times the average object size. The backup memory footprint will be the active memory footprint times the backup count. The total memory footprint is the active memory footprint plus the backup memory footprint:

Total memory footprint = (total objects * average object size) + (total objects * average object size * backup count)

For this example, let’s stipulate the following requirements:

  • 50 GB of active data

  • 40,000 transactions per second

  • 70:30 ratio of reads to writes via map lookups

  • Less than 500 ms latency per transaction

  • A backup count of 2

Cluster Size Using the High-Density Memory Store

Since we have 50 GB of active data, our total memory footprint will be:

  • 50 GB + 50 GB * 2 (backup count) = 150 GB.

Add 40% memory headroom by assuming 150 GB is 60% of a cluster’s total space:

  • 150 / 0.6 = 250

and you will need a total of 250 GB of RAM for data.

To satisfy this use case, you will need three Hazelcast members, each running a 4 GB heap with ~84 GB of data off-heap in the High-Density Data Store.

You cannot have a backup count greater than or equal to the number of members available in the cluster. Hazelcast will ignore higher backup counts and will create the maximum number of backup copies possible. For example, Hazelcast will only create two backup copies in a cluster of three members, even if the backup count is set equal to or higher than three.
No member in a Hazelcast cluster will store both primary data and that data’s backup.

Cluster Size Using Only Heap Memory

Since it’s not practical to run JVMs with greater than a 16 GB heap, you will need a minimum of 11 JVMs, each with a 16 GB heap to store 150 GB of active and backup data as a 16 GB JVM would give approximately 14 GB of storage space. Add the 40% headroom discussed earlier, for a total of 250 GB of usable heap, then you will need ~18 JVMs, each running with a 16 GB heap for active and backup data. Considering that each JVM has some memory overhead and Hazelcast’s rule of thumb for CPU sizing is eight cores per Hazelcast server instance, you will need at least 144 cores and upwards of 300 GB of memory.

Summary

150 GB of data, including backups.

High-Density Memory Store:

  • 3 Hazelcast members

  • 24 cores

  • 256 GB RAM

Heap-only:

  • 18 Hazelcast members

  • 144 cores

  • 300 GB RAM

Streaming Use Case

The sample application is a real-time trade analyzer. Every second it counts the trades completed over the previous minute for each trading symbol. Hazelcast is also used to ingest and buffer the stream of trades. The remote trading applications write trade events to an IMap data structure in the cluster. The analytical job reads the IMap Event Journal and writes the processed results to a rolling file.

The job is configured to be fault-tolerant with the exactly-once processing guarantee.

The cluster is expected to process 50k trade events per second with 10k trade symbols (distinct keys).

Cluster Size and Performance

The following table shows maximum and average latencies out of an example data stream (50k events / second, 10k distinct keys), and measures how the cluster size affects the processing latency.

We benchmarked a job on a cluster of 3, 5 and 9 members. We started with a 3-member cluster as that is a minimal setup for fault-tolerant operations. For each topology, we benchmarked a setup with 1, 10, 20 and 40 jobs running in the cluster.

Cluster machines were of the recommended minimal configuration: AWS c5.2xlarge machines, each of 8 CPU, 16 GB RAM, 10 Gbps network.

1 job in the cluster:

Cluster Size Max (ms) Avg (ms)

3

182

150

5

172

152

9

215

134

10 jobs in the cluster:

Cluster Size Max (ms) Avg (ms)

3

986

877

5

808

719

9

735

557

20 jobs in the cluster:

Cluster Size Max (ms) Avg (ms)

3

1990

1784

5

1593

1470

9

1170

1046

40 jobs in the cluster:

Cluster Size Max (ms) Avg (ms)

3

4382

3948

5

3719

3207

9

2605

2085

Fault-Tolerance

The Event Journal capacity was set to 1.5 million items. With an input data production rate of 50k events per second, the data is kept for 30 seconds before being overwritten. The job snapshot frequency was set to 1 second.

The job is restarted from the last snapshot if a cluster member fails. In our test, the cluster restarted the processing in under 3 seconds (failure detection, clustering changes, job restart using the last snapshot) giving the job enough time to reprocess the 3 seconds (~ 150k events) of data it missed.

More aggressive failure detectors and a larger event journal can be used to stretch the error window.

Garbage Collection Considerations

Based on an extensive testing, we extracted some points of advice on how to choose the right JDK/GC (Garbage Collection) combination and how to tune your setup to the workload of your Hazelcast data pipeline.

Upgrade Your JDK

Newer JDK releases often contain performance improvements, so we strongly recommend you to upgrade to a newer JDK version. See the supported JDKs. Note that the garbage collectors of the newer JDK versions have been improved to achieve much higher throughputs.

The G1 Collector is Great for Most Workloads

For batch workloads, as well as streaming workloads that can tolerate occasional latency spikes of 2-3 seconds, the G1 collector is the best choice because it has very good throughput and its failure modes are graceful. It performs very well in a variety of workloads without any tuning parameters. Its default target for the maximum stop-the-world GC pause is 200 ms and you can configure it lower, down to 5 ms (using -XX:MaxGCPauseMillis). Lower targets allow less throughput, though. The mentioned 2-3 seconds latency (as opposed to the usual 0.2 seconds) occurs only in exceptional conditions with very high GC pressure. The advantage of G1 over many other collectors is a graceful increase in GC pause length under such conditions.

For Latency Goals Below 10 ms, Consider a Low-Latency GC

If you aim for very low latencies (anything below 10 ms), you can achieve it with G1 as well, but you will probably have to use the -XX:MaxNewSize flag in order to constrain the Minor GC pause duration. In our test we found the values 100m-200m to work best over our range of throughputs, lower values being better for lower throughputs.

If your data pipeline doesn’t have too large state (i.e., less than a million keys within a sliding window), you can consider the Z garbage collector. We found it to work well without any tuning parameters. Its current downside is that it handles less throughput compared to G1 and, being non-generational, doesn’t work well if you have a lot of static data on the heap (for example, if your data pipeline has a hashJoin stage).

In our tests we found that as of JDK version 14.0.2, the other low-latency collector, Shenandoah, still did not perform as well as ZGC and latencies with it exceeded 10 ms in many cases.

Reduce the Jet Cooperative Thread Pool

A concurrent garbage collector uses a number of threads to do its work in the background. It uses some static heuristics to determine how many to use, mostly based on the number of availableProcessors that the JVM reports. For example, on a 16-vCPU EC2 c5.4xlarge instance:

  • ZGC uses 2 threads

  • G1 uses 3 threads

  • Shenandoah uses 4 threads

The number of GC threads is configurable through -XX:ConcGCThreads, but we found it best to leave the default setting. However, it is important to find out the number of GC threads and set Hazelcast’s config/hazelcast-jet.yaml/instance/cooperative-thread-count to (availableProcessors - ConGCThreads). This will allow the GC threads to be assigned to their own CPU cores, alongside Hazelcast’s threads, and thus the OS can avoid having to interleave Hazelcast and GC threads on the same core.

A Hazelcast data pipeline may use additional threads for non-cooperative tasklets, in this case you may consider adjusting the cooperative thread pool size even lower.

Egregious Amounts of Free Heap Help Latency

The data pipeline in our tests used less than 1 GB of heap, but we needed at least -Xmx=4g to get a good 99.99% latency. We also tested with -Xmx=8g (less than 15% heap usage), and it made the latencies even lower.

For Batch Processing, Garbage-Free Aggregation is a Big Deal

In batch aggregation, once a given grouping key is observed, the state associated with it is retained until the end of the computation. If updating that state doesn’t create garbage, the whole aggregation process is garbage-free. The computation still produces young garbage, but since most garbage collectors are generational, this has significantly less cost. In our tests, garbage-free aggregation boosted the throughput of the batch pipeline by 30-35%.

For this reason we always strive to make the aggregate operations we provide with Hazelcast garbage-free. Examples are summing, averaging and finding extremes. Our current implementation of linear trend, however, does generate garbage because it uses immutable BigDecimals in the state.

If your requirements call for a complex aggregate operation not provided by Hazelcast, and if you use Hazelcast for batch processing, putting extra effort into implementing a custom garbage-free aggregate operation can be worth it.