A newer version of IMDG is available.

View latest

Want to try Hazelcast 5.0 beta?

We’ve combined the in-memory storage of IMDG with the stream processing power of Jet to bring you an all new Hazelcast platform.

Split-Brain Recovery

Hazelcast deploys a background task that periodically searches for split clusters. When a split is detected, the side that will initiate the merge process is decided. This decision is based on the cluster size; the smaller cluster, by member count, merges into the bigger one. If they have an equal number of members, then a hashing algorithm determines the merging cluster. When deciding the merging side, both sides ensure that there’s no intersection in their member lists.

After the merging side is decided, the master member (the eldest one) of the merging side initiates the cluster merge process by sending merge instructions to the members in its cluster.

While recovering from partitioning, Hazelcast uses merge policies for supported data structures to resolve data conflicts between split clusters. A merge policy is a callback function to resolve conflicts between the existing and merging data. Hazelcast provides an interface to be implemented and also a selection of out-of-the-box policies. Data structures without split-brain support discard the data from merging side.

Each member of the merging cluster:

  • closes all of its network connections (detach from its cluster)

  • takes a snapshot of local data structures which support split-brain recovery

  • discards all data structure data

  • joins to the new cluster as lite member

  • sends merge operations to the new cluster from local snapshots.

For more information, see the Consistency and Replication Model chapter.

Merge Policies

Since Hazelcast 3.10 all merge policies implement the unified interface com.hazelcast.spi.SplitBrainMergePolicy. We provide the following out-of-the-box implementations:

  • DiscardMergePolicy: The entry from the smaller cluster is discarded.

  • ExpirationTimeMergePolicy: The entry with the higher expiration time wins.

  • HigherHitsMergePolicy: The entry with the higher number of hits wins.

  • HyperLogLogMergePolicy: Specialized merge policy for the CardinalityEstimator, which uses the default merge algorithm from HyperLogLog research, keeping the maximum register value of the two given instances.

  • LatestAccessMergePolicy: The entry with the latest access wins.

  • LatestUpdateMergePolicy: The entry with the latest update wins.

  • PassThroughMergePolicy: the entry from the smaller cluster wins.

  • PutIfAbsentMergePolicy: The entry from the smaller cluster wins if it doesn’t exist in the cluster.

Additionally you can develop a custom merge policy by implementing the SplitBrainMergePolicy interface, as explained in the Custom Merge Policies section

Supported Data Structures

The following data structures support split-brain recovery:

  • IMap (including High-Density Memory Store backed IMap)

  • ICache (including High-Density Memory Store backed IMap)

  • ReplicatedMap

  • MultiMap

  • IAtomicLong

  • IAtomicReference

  • IQueue

  • IList

  • ISet

  • RingBuffer

  • CardinalityEstimator

  • ScheduledExecutorService

The statistic based out-of-the-box merge policies are only supported by IMap, ICache, ReplicatedMap and MultiMap. The HyperLogLogMergePolicy is supported by the CardinalityEstimator.

Except the CardinalityEstimator data structure, the default merge policy for all the Hazelcast data structures that support split-brain recovery (listed above) is PutIfAbsentMergePolicy. For the CardinalityEstimator data structure, the default merge policy is HyperLogLogMergePolicy.

See also the Merge Types section for a complete overview of supported merge types of each data structure. There is a config validation which checks these constraints to provide fail-fast behavior for invalid configurations.

For the other data structures, e.g., ISemaphore, ICountdownLatch and ILock, the instance from the smaller cluster is discarded during the split-brain recovery.

Configuring Merge Policies

The merge policies are configured via a MergePolicyConfig, which can be set for all supported data structures. The only exception is ICache, which just accepts the merge policy classname (due to compatibility reasons with older Hazelcast clients). For ICache, all other configurable merge parameters are the default values from MergePolicyConfig.

For custom merge policies you should set the full class name of your implementation as the merge-policy configuration. For the out-of-the-box merge policies the simple classname is enough.

Declarative Configuration

Here are examples how merge policies can be specified for various data structures:

  • XML

  • YAML

<hazelcast>
    ...
    <map name="default">
        <merge-policy batch-size="100">LatestUpdateMergePolicy</merge-policy>
    </map>

    <replicatedmap name="default">
        <merge-policy batch-size="100">org.example.merge.MyMergePolicy</merge-policy>
    </replicatedmap>

    <multimap name="default">
        <merge-policy batch-size="50">HigherHitsMergePolicy</merge-policy>
    </multimap>

    <list name="default">
        <merge-policy batch-size="500">org.example.merge.MyMergePolicy</merge-policy>
    </list>

    <atomic-long name="default">
        <merge-policy>PutIfAbsentMergePolicy</merge-policy>
    </atomic-long>
    ...
</hazelcast>
hazelcast:
  map:
    default:
      merge-policy:
        batch-size: 100
        class-name: LatestUpdateMergePolicy
  replicatedmap:
    default:
      merge-policy:
        batch-size: 100
        class-name: org.example.merge.MyMergePolicy
  multimap:
    default:
      merge-policy:
        batch-size: 50
        class-name: HigherHitsMergePolicy
  list:
    default:
      merge-policy:
        batch-size: 500
        class-name: org.example.merge.MyMergePolicy
  atomic-long:
    default:
      merge-policy:
        class-name: PutIfAbsentMergePolicy

Here is how merge policies are specified for ICache (it is the same configuration tag, but lacks the support for additional attributes like batch-size):

  • XML

  • YAML

<hazelcast>
    ...
    <cache name="default">
        <merge-policy>org.example.merge.MyMergePolicy</merge-policy>
    </cache>
    ...
</hazelcast>
hazelcast:
  cache:
    default:
      merge-policy:
        class-name: org.example.merge.MyMergePolicy

Programmatic Configuration

Here are examples how merge policies can be specified for various data structures:

        MergePolicyConfig mergePolicyConfig = new MergePolicyConfig()
                .setPolicy("org.example.merge.MyMergePolicy")
                .setBatchSize(100);

        MapConfig mapConfig = new MapConfig("default")
                .setMergePolicyConfig(mergePolicyConfig);

        ListConfig listConfig = new ListConfig("default")
                .setMergePolicyConfig(mergePolicyConfig);

        Config config = new Config()
                .addMapConfig(mapConfig)
                .addListConfig(listConfig);

Here is how merge policies are specified for ICache (you can only set the merge policy classname):

CacheConfig mapConfig = new CacheConfig()
  .setName("default")
  .setMergePolicy("org.example.merge.MyMergePolicy");

Config config = new Config()
  .addMapConfig(mapConfig);

Custom Merge Policies

To implement a custom merge policy you have to implement com.hazelcast.spi.SplitBrainMergePolicy:

public interface SplitBrainMergePolicy<V, T extends MergingValue<V>, R>
    extends DataSerializable {

  R merge(T mergingValue, T existingValue);
}

MergingValue is an interface which describes a merge type.

Please have in mind that existingValue can be null. This happens when a data structure or key-based entry was just created in the smaller cluster.

Merge Types

A merge type defines an attribute which is required by a merge policy and provided by a data structure.

MergingValue is the main merge type, which is required by all merge policies and provided by all data structures. It contains the value of the merged data in raw (in-memory storage) and deserialized format:

public interface MergingValue<V> extends MergingView {

  V getValue();

  Object getRawValue();
}

MergingValue extends MergingView, which is a marker interface extended by all provided merge types.

The most common extension of MergingValue is MergingEntry, which additionally provides the key in raw (in-memory storage) and deserialized format (used by all key-based data structures like IMap or ICache):

public interface MergingEntry<K, V> extends MergingValue<V> {

  K getKey();

  Object getRawKey();
}

In addition we have a bunch of specialized merge types, e.g., for provided statistics. An example is MergingHits, which provides the hit counter of the merge data:

public interface MergingHits extends MergingView {

  long getHits();
}

The class com.hazelcast.spi.merge.SplitBrainMergeTypes contains composed interfaces, which show the provided merge types and required merge policy return type for each data structure:

public interface ReplicatedMapMergeTypes<K, V> extends MergingEntry<K, V>,
    MergingCreationTime, MergingHits, MergingLastAccessTime, MergingLastUpdateTime,
    MergingTTL {
}

public interface QueueMergeTypes<V> extends MergingValue<Collection<V>> {
}

The ReplicatedMap provides key/value merge data, with the creation time, access hits, last access time, last update time and TTL. The return type of the merge policy is Object.

The IQueue just provides a collection of values. The return type is also a Collection<Object>.

The following is the full list of merge types:

  • MergingValue: Represents the value of the merged data.

  • MergingEntry: Represents the key and value of the merged data.

  • MergingCreationTime: Represents the creation time of the merging process.

  • MergingHits: Represents the access hits of the merged data.

  • MergingLastAccessTime: Represents the last time when the merged data is accessed.

  • MergingLastUpdateTime: Represents the last time when the merged data is updated.

  • MergingTTL: Represents the time-to-live value of the merged data.

  • MergingMaxIdle: Represents the maximum idle timeout value of the merged data.

  • MergingCost: Represents the memory costs for the merging process after a split-brain.

  • MergingVersion: Represents the version of the merged data.

  • MergingExpirationTime: Represents the expiration time of the merged data.

  • MergingLastStoredTime: Represents the last stored time of the merged data.

And the following table shows the merge types provided by each data structure:

Table 1. Merge Types
Data Structure Merge Type

IMap

  • MergingEntry

  • MergingCreationTime

  • MergingHits

  • MergingLastAccessTime

  • MergingLastUpdateTime

  • MergingTTL

  • MergingMaxIdle

  • MergingCosts

  • MergingVersion

  • MergingExpirationTime

  • MergingLastStoredTime

ICache

  • MergingEntry

  • MergingCreationTime

  • MergingHits

  • MergingLastAccessTime

  • MergingLastUpdateTime

  • MergingTTL

ReplicatedMap

  • MergingEntry

  • MergingCreationTime

  • MergingHits

  • MergingLastAccessTime

  • MergingLastUpdateTime

  • MergingTTL

MultiMap

  • MergingEntry

  • MergingCreationTime

  • MergingHits

  • MergingLastAccessTime

  • MergingLastUpdateTime

IQueue, ISet, IList, Ringbuffer

  • MergingValue

IAtomicLong, IAtomicReference

  • MergingValue

CardinalityEstimator

  • MergingEntry

ScheduledExecutorService

  • MergingEntry

The following sections show various examples on how to implement merge type interfaces for all data structures, specific merge types or a specific data structure.

Accessing Deserialized Values

MergingValue.getRawValue() and MergingEntry.getRawKey() always return the data in the in-memory format of the data structure. For some data structure like IMap this depends on your configuration. Other data structure like ISet or IList always use the BINARY in-memory format.

If you need the deserialized key or value, you have to call MergingValue.getValue() or MergingEntry.getKey(). The deserialization is done lazily on that method call, since it’s quite expensive and should be avoided if the result is not needed. This also requires the deserialized classes to be on the classpath of the server. Otherwise a ClassNotFoundException is thrown.

This is an example which checks if the (deserialized) value of the mergingValue or existingValue is an Integer. If so it is merged, otherwise null is returned (which removes the entry):

public class MergeIntegerValuesMergePolicy<V> implements SplitBrainMergePolicy<V, MergingValue<V>> {

    @Override
    public V merge(MergingValue<V> mergingValue, MergingValue<V> existingValue) {
        Object mergingUserValue = mergingValue.getDeserializedValue();
        Object existingUserValue = existingValue == null ? null : existingValue.getDeserializedValue();
        System.out.println("========================== Merging..."
                + "\n    mergingValue: " + mergingUserValue
                + "\n    existingValue: " + existingUserValue
                + "\n    mergingValue class: " + mergingUserValue.getClass().getName()
                + "\n    existingValue class: " + (existingUserValue == null ? "null" : existingUserValue.getClass().getName())
        );
        if (mergingUserValue instanceof Integer) {
            return mergingValue.getValue();
        }
        return null;
    }

    @Override
    public void writeData(ObjectDataOutput out) {
    }

    @Override
    public void readData(ObjectDataInput in) {
    }
}

For data structures like ISet or ICollection you need a merge policy, which supports collections:

public class MergeCollectionOfIntegerValuesMergePolicy
        implements SplitBrainMergePolicy<Collection<Object>, MergingValue<Collection<Object>>> {

    @Override
    public Collection<Object> merge(MergingValue<Collection<Object>> mergingValue,
                                    MergingValue<Collection<Object>> existingValue) {
        Collection<Object> result = new ArrayList<Object>();
        for (Object value : mergingValue.<Collection<Object>>getDeserializedValue()) {
            if (value instanceof Integer) {
                result.add(value);
            }
        }
        if (existingValue != null) {
            for (Object value : existingValue.<Collection<Object>>getDeserializedValue()) {
                if (value instanceof Integer) {
                    result.add(value);
                }
            }
        }
        return result;
    }

    @Override
    public void writeData(ObjectDataOutput out) {
    }

    @Override
    public void readData(ObjectDataInput in) {
    }
}

You can also combine both merge policies to support single values and collections. This merge policy is a bit more complex and less type safe, but can be configured on all data structures:

public class MergeIntegerValuesMergePolicy2<V, T extends MergingValue<V>> implements SplitBrainMergePolicy<V, T> {

    @Override
    public V merge(T mergingValue, T existingValue) {
        if (mergingValue.getDeserializedValue() instanceof Integer) {
            return mergingValue.getValue();
        }
        if (existingValue != null && existingValue.getDeserializedValue() instanceof Integer) {
            return existingValue.getValue();
        }
        if (mergingValue.getValue() instanceof Collection) {
            Collection<Object> result = new ArrayList<Object>();
            addIntegersToCollection(mergingValue, result);
            if (result.isEmpty() && existingValue != null) {
                addIntegersToCollection(existingValue, result);
            }
            return (V) result;
        }
        return null;
    }

    private void addIntegersToCollection(T mergingValue, Collection<Object> result) {
        for (Object value : mergingValue.<Collection<Object>>getDeserializedValue()) {
            if (value instanceof Integer) {
                result.add(value);
            }
        }
    }

    @Override
    public void writeData(ObjectDataOutput out) {
    }

    @Override
    public void readData(ObjectDataInput in) {
    }
}
Please have in mind that existingValue can be null, so a null check is mandatory before calling existingValue.getValue() or existingValue.getRawValue().
If you return null on a collection based data structure, the whole data structure will be removed. An empty collection works in the same way, so you don’t have to check Collection.isEmpty() in your merge policy.

Accessing Hazelcast UserContext

If you need access to external references in your merge policy, you can use the Hazelcast UserContext to get them injected. An example would be a database connection to check which value is stored in your database. To achieve this your merge policy needs to implement HazelcastInstanceAware and call HazelcastInstance.getUserContext():

public class UserContextMergePolicy<V> implements SplitBrainMergePolicy<V, MergingValue<V>>, HazelcastInstanceAware {

    public static final String TRUTH_PROVIDER_ID = "truthProvider";

    private transient TruthProvider truthProvider;

    @Override
    public V merge(MergingValue<V> mergingValue, MergingValue<V> existingValue) {
        Object mergingUserValue = mergingValue.getDeserializedValue();
        Object existingUserValue = existingValue == null ? null : existingValue.getDeserializedValue();
        boolean isMergeable = truthProvider.isMergeable(mergingUserValue, existingUserValue);
        System.out.println("========================== Merging..."
                        + "\n    mergingValue: " + mergingUserValue
                        + "\n    existingValue: " + existingUserValue
                        + "\n    isMergeable(): " + isMergeable
        );
        if (isMergeable) {
            return mergingValue.getValue();
        }
        return null;
    }

    @Override
    public void writeData(ObjectDataOutput out) {
    }

    @Override
    public void readData(ObjectDataInput in) {
    }

    @Override
    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
        ConcurrentMap<String, Object> userContext = hazelcastInstance.getUserContext();
        truthProvider = (TruthProvider) userContext.get(TRUTH_PROVIDER_ID);
    }

    public interface TruthProvider {

        boolean isMergeable(Object mergingValue, Object existingValue);
    }
}

The UserContext can be setup like this:

MergePolicyConfig mergePolicyConfig = new MergePolicyConfig()
  .setPolicy(UserContextMergePolicy.class.getName());

MapConfig mapConfig = new MapConfig("default")
  .setMergePolicyConfig(mergePolicyConfig);

ConcurrentMap<String, Object> userContext = new ConcurrentHashMap<String, Object>();
userContext.put(TruthProvider.TRUTH_PROVIDER_ID, new ExampleTruthProvider());

Config config = new Config()
  .addMapConfig(mapConfig)
  .setUserContext(userContext);

Hazelcast.newHazelcastInstance(config);

The merge operations are executed on the partition threads. Database accesses are slow compared to in-memory operations. The SplitBrainMergePolicy.merge() method is called for every key-value pair or every collection from your smaller cluster, which has a merge policy defined. So there can be millions of database accesses due to a merge policy, which implements this. Be aware that this can block your cluster for a long time or overload your database due to the high amount of queries.

Also the com.hazelcast.core.LifeCycleEvent.MERGED is thrown after a timeout (we don’t wait forever for merge operations to continue). At the moment this timeout is 500 milliseconds per merged item or entry, but at least 5 seconds. If your database is slow, you might get the LifeCycleEvent while there are still merge operations in progress.

Merge Policies With Multiple Merge Types

You can also write a merge policy, which requires multiple merge types. This merge policy is supported by all data structures, which provide MergingHits and MergingCreationTime:

public class ComposedHitsAndCreationTimeMergePolicy<V, T extends MergingHits<V> & MergingCreationTime<V>>
        implements SplitBrainMergePolicy<V, T> {

    @Override
    public V merge(T mergingValue, T existingValue) {
        if (existingValue == null) {
            return mergingValue.getValue();
        }
        System.out.println("========================== Merging value " + mergingValue.getDeserializedValue() + "..."
                + "\n    mergingValue creation time: " + mergingValue.getCreationTime()
                + "\n    existingValue creation time: " + existingValue.getCreationTime()
                + "\n    mergingValue hits: " + mergingValue.getHits()
                + "\n    existingValue hits: " + existingValue.getHits()
        );

        if (mergingValue.getCreationTime() < existingValue.getCreationTime()
                && mergingValue.getHits() > existingValue.getHits()) {
            return mergingValue.getValue();
        }
        return existingValue.getValue();
    }

    @Override
    public void writeData(ObjectDataOutput out) {
    }

    @Override
    public void readData(ObjectDataInput in) {
    }
}

If you configure this merge policy on a data structures, which does not provide these merge types, you get an InvalidConfigurationException with a message like:

The merge policy org.example.merge.ComposedHitsAndCreationTimeMergePolicy
can just be configured on data structures which provide the merging type
com.hazelcast.spi.merge.MergingHits.
See SplitBrainMergingTypes for supported merging types.

Merge Policies For Specific Data Structures

It’s also possible to restrict a merge policy to a specific data structure. This merge policy, for example, only works on IMap:

public class MapEntryCostsMergePolicy implements SplitBrainMergePolicy<Data, MapMergeTypes> {

    @Override
    public Data merge(MapMergeTypes mergingValue, MapMergeTypes existingValue) {
        if (existingValue == null) {
            return mergingValue.getValue();
        }
        System.out.println("========================== Merging key " + mergingValue.getDeserializedKey() + "..."
                + "\n    mergingValue costs: " + mergingValue.getCost()
                + "\n    existingValue costs: " + existingValue.getCost()
        );

        if (mergingValue.getCost() > existingValue.getCost()) {
            return mergingValue.getValue();
        }
        return existingValue.getValue();
    }

    @Override
    public void writeData(ObjectDataOutput out) {
    }

    @Override
    public void readData(ObjectDataInput in) {
    }
}

If you configure it on other data structures, you get an InvalidConfigurationException with a message like:

The merge policy org.example.merge.MapEntryCostsMergePolicy
can just be configured on data structures which provide the merging type
com.hazelcast.spi.merge.SplitBrainMergeTypes$MapMergeTypes.
See SplitBrainMergingTypes for supported merging types.

This is another example for a merge policy, which only works on the IAtomicReference:

public class AtomicReferenceMergeIntegerValuesMergePolicy implements SplitBrainMergePolicy<Object, AtomicReferenceMergeTypes> {

    @Override
    public Object merge(AtomicReferenceMergeTypes mergingValue, AtomicReferenceMergeTypes existingValue) {
        Object mergingUserValue = mergingValue.getDeserializedValue();
        Object existingUserValue = existingValue == null ? null : existingValue.getDeserializedValue();
        System.out.println("========================== Merging..."
                + "\n    mergingValue: " + mergingUserValue
                + "\n    existingValue: " + existingUserValue
                + "\n    mergingValue class: " + mergingUserValue.getClass().getName()
                + "\n    existingValue class: " + (existingUserValue == null ? "null" : existingUserValue.getClass().getName())
        );
        if (mergingUserValue instanceof Integer) {
            return mergingValue.getValue();
        }
        return null;
    }

    @Override
    public void writeData(ObjectDataOutput out) {
    }

    @Override
    public void readData(ObjectDataInput in) {
    }
}

Although every data structure supports MergingValue, which is the only merge type of AtomicReferenceMergeTypes, this merge policy is restricted to IAtomicReference data structures:

The merge policy org.example.merge.AtomicReferenceMergeIntegerValuesMergePolicy
can just be configured on data structures which provide the merging type
com.hazelcast.spi.merge.SplitBrainMergeTypes$AtomicReferenceMergeTypes.
See SplitBrainMergingTypes for supported merging types.

Best Practices

Here are some best practices when implementing your own merge policy

  • Only call MergingValue.getValue() and MergingEntry.getKey() when you really need the deserialized value to save costs (CPU and memory) and avoid ClassNotFoundException.

  • If you want to return one of the given values (merging or existing), it’s best to return mergingValue.getRawValue() or existingValue.getRawValue(), since they are already in the correct in-memory format of the data structure. If you return a deserialized value, it might need to be serialized again, which are avoidable costs.

  • Be careful with slow operations in the merge policy (like database accesses), since they block your partition threads. Also the LifeCycleEvent.MERGED or LifeCycleEvent.MERGE_FAILED may be thrown too early, if the merge operations take too long to finish.