Split-Brain Recovery
Hazelcast deploys a background task that periodically searches for split clusters. When a split is detected, the side that will initiate the merge process is decided. This decision is based on the cluster size; the smaller cluster, by member count, merges into the bigger one. If they have an equal number of members, then a hashing algorithm determines the merging cluster. When deciding the merging side, both sides ensure that there’s no overlap in their member lists. In some cases, recovery from split-brain may be problematic due to discovery issues; see Eliminating Unsuccessful Cluster Merges on how to overcome these issues.
After the merging side is decided, the oldest cluster member of the merging side initiates the cluster merge process by sending merge instructions to the members in its cluster.
|
While recovering from partitioning, Hazelcast uses merge policies for supported data structures to resolve data conflicts between split clusters. A merge policy is a callback function to resolve conflicts between the existing and merging data. Hazelcast provides an interface to be implemented and also a selection of out-of-the-box policies. Data structures without split-brain recovery support discarding the data from merging side.
Each member of the merging cluster:
-
closes all its network connections (detach from its cluster)
-
takes a snapshot of local data structures which support split-brain recovery
-
discards all data structure data
-
joins to the new cluster as lite member
-
sends merge operations to the new cluster from local snapshots.
For more information, see the Consistency and Replication Model chapter.
Merge Policies
Since Hazelcast 3.10 all merge policies implement
the unified interface com.hazelcast.spi.SplitBrainMergePolicy
.
We provide the following out-of-the-box implementations:
-
DiscardMergePolicy
: The entry from the smaller cluster is discarded. -
ExpirationTimeMergePolicy
: The entry with the higher expiration time wins. -
HigherHitsMergePolicy
: The entry with the higher number of hits wins. -
HyperLogLogMergePolicy
: Specialized merge policy for theCardinalityEstimator
, which uses the default merge algorithm from HyperLogLog research, keeping the maximum register value of the two given instances. -
LatestAccessMergePolicy
: The entry with the latest access wins. -
LatestUpdateMergePolicy
: The entry with the latest update wins. -
PassThroughMergePolicy
: the entry from the smaller cluster wins. -
PutIfAbsentMergePolicy
: The entry from the smaller cluster wins if it doesn’t exist in the cluster.
Additionally, you can develop a custom merge policy by implementing
the SplitBrainMergePolicy
interface, as explained in the
Custom Merge Policies section
Supported Data Structures
The following data structures support split-brain recovery:
-
IMap
(including High-Density Memory Store backed IMap) -
ICache
(including High-Density Memory Store backed IMap) -
ReplicatedMap
-
MultiMap
-
IAtomicLong
-
IAtomicReference
-
IQueue
-
IList
-
ISet
-
Ringbuffer
-
CardinalityEstimator
-
ScheduledExecutorService
The statistic based out-of-the-box merge policies are only supported by
IMap
, ICache
, ReplicatedMap
and MultiMap
.
The HyperLogLogMergePolicy
is supported by the CardinalityEstimator
.
Except the CardinalityEstimator data structure,
the default merge policy for all the Hazelcast data structures that
support split-brain recovery (listed above) is PutIfAbsentMergePolicy .
For the CardinalityEstimator data structure,
the default merge policy is HyperLogLogMergePolicy .
|
See also the Merge Types section for a complete overview of supported merge types of each data structure. There is a config validation which checks these constraints to provide fail-fast behavior for invalid configurations.
For the other data structures, e.g., ISemaphore , ICountDownLatch and ILock ,
the instance from the smaller cluster is discarded during the split-brain recovery.
|
Configuring Merge Policies
The merge policies are configured via a MergePolicyConfig
, which can be set for
all supported data structures. The only exception is ICache
, which just accepts
the merge policy classname (due to compatibility reasons with older Hazelcast clients).
For ICache
, all other configurable merge parameters are the default values from MergePolicyConfig
.
For custom merge policies you should set the full class name of your implementation as
the merge-policy
configuration. For the out-of-the-box merge policies the simple classname is enough.
Declarative Configuration
Here are examples how merge policies can be specified for various data structures:
<hazelcast>
...
<map name="default">
<merge-policy batch-size="100">LatestUpdateMergePolicy</merge-policy>
</map>
<replicatedmap name="default">
<merge-policy batch-size="100">org.example.merge.MyMergePolicy</merge-policy>
</replicatedmap>
<multimap name="default">
<merge-policy batch-size="50">HigherHitsMergePolicy</merge-policy>
</multimap>
<list name="default">
<merge-policy batch-size="500">org.example.merge.MyMergePolicy</merge-policy>
</list>
<atomic-long name="default">
<merge-policy>PutIfAbsentMergePolicy</merge-policy>
</atomic-long>
...
</hazelcast>
hazelcast:
map:
default:
merge-policy:
batch-size: 100
class-name: LatestUpdateMergePolicy
replicatedmap:
default:
merge-policy:
batch-size: 100
class-name: org.example.merge.MyMergePolicy
multimap:
default:
merge-policy:
batch-size: 50
class-name: HigherHitsMergePolicy
list:
default:
merge-policy:
batch-size: 500
class-name: org.example.merge.MyMergePolicy
atomic-long:
default:
merge-policy:
class-name: PutIfAbsentMergePolicy
Here is how merge policies are specified for ICache
(it is the same configuration tag,
but lacks the support for additional attributes like batch-size
):
Programmatic Configuration
Here are examples how merge policies can be specified for various data structures:
MergePolicyConfig mergePolicyConfig = new MergePolicyConfig()
.setPolicy("org.example.merge.MyMergePolicy")
.setBatchSize(100);
MapConfig mapConfig = new MapConfig("default")
.setMergePolicyConfig(mergePolicyConfig);
ListConfig listConfig = new ListConfig("default")
.setMergePolicyConfig(mergePolicyConfig);
Config config = new Config()
.addMapConfig(mapConfig)
.addListConfig(listConfig);
Here is how merge policies are specified for ICache
(you can only set the merge policy classname):
CacheConfig mapConfig = new CacheConfig()
.setName("default")
.setMergePolicy("org.example.merge.MyMergePolicy");
Config config = new Config()
.addMapConfig(mapConfig);
Custom Merge Policies
To implement a custom merge policy you have to implement com.hazelcast.spi.SplitBrainMergePolicy
:
public interface SplitBrainMergePolicy<V, T extends MergingValue<V>, R>
extends DataSerializable {
R merge(T mergingValue, T existingValue);
}
MergingValue
is an interface which describes a merge type.
Please have in mind that existingValue can be null .
This happens when a data structure or key-based entry was just created in the smaller cluster.
|
Merge Types
A merge type defines an attribute which is required by a merge policy and provided by a data structure.
MergingValue
is the main merge type, which is required by all merge policies and provided by
all data structures. It contains the value of the merged data in raw (in-memory storage) and
deserialized format:
public interface MergingValue<V> extends MergingView {
V getValue();
Object getRawValue();
}
MergingValue
extends MergingView
, which is a marker interface extended by all provided merge types.
The most common extension of MergingValue
is MergingEntry
, which additionally provides the key in
raw (in-memory storage) and deserialized format (used by all key-based data structures like
IMap
or ICache
):
public interface MergingEntry<K, V> extends MergingValue<V> {
K getKey();
Object getRawKey();
}
In addition, we have a bunch of specialized merge types, e.g., for provided statistics.
An example is MergingHits
, which provides the hit counter of the merge data:
public interface MergingHits extends MergingView {
long getHits();
}
The class com.hazelcast.spi.merge.SplitBrainMergeTypes
contains composed interfaces,
which show the provided merge types and required merge policy return type for each data structure:
public interface ReplicatedMapMergeTypes<K, V> extends MergingEntry<K, V>,
MergingCreationTime, MergingHits, MergingLastAccessTime, MergingLastUpdateTime,
MergingTTL {
}
public interface QueueMergeTypes<V> extends MergingValue<Collection<V>> {
}
The ReplicatedMap
provides key/value merge data, with the creation time, access hits,
last access time, last update time and TTL. The return type of the merge policy is Object
.
The IQueue
just provides a collection of values. The return type is also a Collection<Object>
.
The following is the full list of merge types:
-
MergingValue
: Represents the value of the merged data. -
MergingEntry
: Represents the key and value of the merged data. -
MergingCreationTime
: Represents the creation time of the merging process. -
MergingHits
: Represents the access hits of the merged data. -
MergingLastAccessTime
: Represents the last time when the merged data is accessed. -
MergingLastUpdateTime
: Represents the last time when the merged data is updated. -
MergingTTL
: Represents the time-to-live value of the merged data. -
MergingMaxIdle
: Represents the maximum idle timeout value of the merged data. -
MergingCost
: Represents the memory costs for the merging process after a split-brain. -
MergingVersion
: Represents the version of the merged data. -
MergingExpirationTime
: Represents the expiration time of the merged data. -
MergingLastStoredTime
: Represents the last stored time of the merged data.
And the following table shows the merge types provided by each data structure:
Data Structure | Merge Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The following sections show various examples on how to implement merge type interfaces for all data structures, specific merge types or a specific data structure.
Accessing Deserialized Values
MergingValue.getRawValue()
and MergingEntry.getRawKey()
always return
the data in the in-memory format of the data structure.
For some data structure like IMap
this depends on your configuration.
Other data structure like ISet
or IList
always use the BINARY
in-memory format.
If you need the deserialized key or value, you have to call MergingValue.getValue()
or
MergingEntry.getKey()
.
The deserialization is done lazily on that method call, since it’s quite expensive and
should be avoided if the result is not needed.
This also requires the deserialized classes to be on the classpath of the server.
Otherwise, a ClassNotFoundException
is thrown.
This is an example which checks if the (deserialized) value of
the mergingValue
or existingValue
is an Integer
.
If so it is merged, otherwise null
is returned (which removes the entry):
public class MergeIntegerValuesMergePolicy<V> implements SplitBrainMergePolicy<V, MergingValue<V>, Object> {
@Override
public Object merge(MergingValue<V> mergingValue, MergingValue<V> existingValue) {
Object mergingUserValue = mergingValue.getValue();
Object existingUserValue = existingValue == null ? null : existingValue.getValue();
System.out.println("========================== Merging..."
+ "\n mergingValue: " + mergingUserValue
+ "\n existingValue: " + existingUserValue
+ "\n mergingValue class: " + mergingUserValue.getClass().getName()
+ "\n existingValue class: " + (existingUserValue == null ? "null" : existingUserValue.getClass().getName())
);
if (mergingUserValue instanceof Integer) {
return mergingValue.getRawValue();
}
return null;
}
@Override
public void writeData(ObjectDataOutput out) {
}
@Override
public void readData(ObjectDataInput in) {
}
}
For data structures like ISet
or ICollection
you need a merge policy, which supports collections:
public class MergeCollectionOfIntegerValuesMergePolicy
implements SplitBrainMergePolicy<Collection<Object>, MergingValue<Collection<Object>>, Collection<Object>> {
@Override
public Collection<Object> merge(MergingValue<Collection<Object>> mergingValue,
MergingValue<Collection<Object>> existingValue) {
Collection<Object> result = new ArrayList<>();
for (Object value : mergingValue.getValue()) {
if (value instanceof Integer) {
result.add(value);
}
}
if (existingValue != null) {
for (Object value : existingValue.getValue()) {
if (value instanceof Integer) {
result.add(value);
}
}
}
return result;
}
@Override
public void writeData(ObjectDataOutput out) {
}
@Override
public void readData(ObjectDataInput in) {
}
}
You can also combine both merge policies to support single values and collections. This merge policy is a bit more complex and less type safe, but can be configured on all data structures:
public class MergeIntegerValuesMergePolicy2<V, T extends MergingValue<V>> implements SplitBrainMergePolicy<V, T, Object> {
@Override
public Object merge(T mergingValue, T existingValue) {
if (mergingValue.getValue() instanceof Integer) {
return mergingValue.getRawValue();
}
if (existingValue != null && existingValue.getValue() instanceof Integer) {
return existingValue.getRawValue();
}
if (mergingValue.getRawValue() instanceof Collection) {
Collection<Object> result = new ArrayList<>();
addIntegersToCollection(mergingValue, result);
if (result.isEmpty() && existingValue != null) {
addIntegersToCollection(existingValue, result);
}
return result;
}
return null;
}
private void addIntegersToCollection(T mergingValue, Collection<Object> result) {
for (Object value : (Collection<Object>) mergingValue.getValue()) {
if (value instanceof Integer) {
result.add(value);
}
}
}
@Override
public void writeData(ObjectDataOutput out) {
}
@Override
public void readData(ObjectDataInput in) {
}
}
Please have in mind that existingValue can be null , so a null check is
mandatory before calling existingValue.getValue() or existingValue.getRawValue() .
|
If you return null on a collection based data structure, the whole data structure will be removed.
An empty collection works in the same way, so you don’t have to check Collection.isEmpty() in your merge policy.
|
Accessing Hazelcast UserContext
If you need access to external references in your merge policy,
you can use the Hazelcast UserContext
to get them injected.
An example would be a database connection to check which value is stored in your database.
To achieve this your merge policy needs to implement HazelcastInstanceAware
and
call HazelcastInstance.getUserContext()
:
public class UserContextMergePolicy<V> implements SplitBrainMergePolicy<V, MergingValue<V>, Object>, HazelcastInstanceAware {
public static final String TRUTH_PROVIDER_ID = "truthProvider";
private transient TruthProvider truthProvider;
@Override
public Object merge(MergingValue<V> mergingValue, MergingValue<V> existingValue) {
Object mergingUserValue = mergingValue.getValue();
Object existingUserValue = existingValue == null ? null : existingValue.getValue();
boolean isMergeable = truthProvider.isMergeable(mergingUserValue, existingUserValue);
System.out.println("========================== Merging..."
+ "\n mergingValue: " + mergingUserValue
+ "\n existingValue: " + existingUserValue
+ "\n isMergeable(): " + isMergeable
);
if (isMergeable) {
return mergingValue.getRawValue();
}
return null;
}
@Override
public void writeData(ObjectDataOutput out) {
}
@Override
public void readData(ObjectDataInput in) {
}
@Override
public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
ConcurrentMap<String, Object> userContext = hazelcastInstance.getUserContext();
truthProvider = (TruthProvider) userContext.get(TRUTH_PROVIDER_ID);
}
public interface TruthProvider {
boolean isMergeable(Object mergingValue, Object existingValue);
}
}
The UserContext
can be setup like this:
MergePolicyConfig mergePolicyConfig = new MergePolicyConfig()
.setPolicy(UserContextMergePolicy.class.getName());
MapConfig mapConfig = new MapConfig("default")
.setMergePolicyConfig(mergePolicyConfig);
ConcurrentMap<String, Object> userContext = new ConcurrentHashMap<String, Object>();
userContext.put(TruthProvider.TRUTH_PROVIDER_ID, new ExampleTruthProvider());
Config config = new Config()
.addMapConfig(mapConfig)
.setUserContext(userContext);
Hazelcast.newHazelcastInstance(config);
The merge operations are executed on the partition threads.
Database accesses are slow compared to in-memory operations.
The Also, the |
Merge Policies With Multiple Merge Types
You can also write a merge policy, which requires multiple merge types.
This merge policy is supported by all data structures, which provide MergingHits
and MergingCreationTime
:
public class ComposedHitsAndCreationTimeMergePolicy<V, T extends MergingValue<V> & MergingHits & MergingCreationTime>
implements SplitBrainMergePolicy<V, T, Object> {
@Override
public Object merge(T mergingValue, T existingValue) {
if (existingValue == null) {
return mergingValue.getValue();
}
System.out.println("========================== Merging value " + mergingValue.getValue() + "..."
+ "\n mergingValue creation time: " + mergingValue.getCreationTime()
+ "\n existingValue creation time: " + existingValue.getCreationTime()
+ "\n mergingValue hits: " + mergingValue.getHits()
+ "\n existingValue hits: " + existingValue.getHits()
);
if (mergingValue.getCreationTime() < existingValue.getCreationTime()
&& mergingValue.getHits() > existingValue.getHits()) {
return mergingValue.getRawValue();
}
return existingValue.getRawValue();
}
@Override
public void writeData(ObjectDataOutput out) {
}
@Override
public void readData(ObjectDataInput in) {
}
}
If you configure this merge policy on a data structures, which does not provide these merge types,
you get an InvalidConfigurationException
with a message like:
The merge policy org.example.merge.ComposedHitsAndCreationTimeMergePolicy
can just be configured on data structures which provide the merging type
com.hazelcast.spi.merge.MergingHits.
See SplitBrainMergingTypes for supported merging types.
Merge Policies For Specific Data Structures
It’s also possible to restrict a merge policy to a specific data structure.
This merge policy, for example, only works on IMap
:
public class MapEntryCostsMergePolicy implements SplitBrainMergePolicy<Object, MapMergeTypes<Object, Object>, Object> {
@Override
public Object merge(MapMergeTypes mergingValue, MapMergeTypes existingValue) {
if (existingValue == null) {
return mergingValue.getValue();
}
System.out.println("========================== Merging key " + mergingValue.getKey() + "..."
+ "\n mergingValue costs: " + mergingValue.getCost()
+ "\n existingValue costs: " + existingValue.getCost()
);
if (mergingValue.getCost() > existingValue.getCost()) {
return mergingValue.getRawValue();
}
return existingValue.getRawValue();
}
@Override
public void writeData(ObjectDataOutput out) {
}
@Override
public void readData(ObjectDataInput in) {
}
}
If you configure it on other data structures, you get an InvalidConfigurationException
with a message like:
The merge policy org.example.merge.MapEntryCostsMergePolicy
can just be configured on data structures which provide the merging type
com.hazelcast.spi.merge.SplitBrainMergeTypes$MapMergeTypes.
See SplitBrainMergingTypes for supported merging types.
This is another example for a merge policy, which only works on
the IAtomicReference
:
public class AtomicReferenceMergeIntegerValuesMergePolicy
implements SplitBrainMergePolicy<Object, AtomicReferenceMergeTypes, Object> {
@Override
public Object merge(AtomicReferenceMergeTypes mergingValue, AtomicReferenceMergeTypes existingValue) {
Object mergingUserValue = mergingValue.getValue();
Object existingUserValue = existingValue == null ? null : existingValue.getValue();
System.out.println("========================== Merging..."
+ "\n mergingValue: " + mergingUserValue
+ "\n existingValue: " + existingUserValue
+ "\n mergingValue class: " + mergingUserValue.getClass().getName()
+ "\n existingValue class: " + (existingUserValue == null ? "null" : existingUserValue.getClass().getName())
);
if (mergingUserValue instanceof Integer) {
return mergingValue.getRawValue();
}
return null;
}
@Override
public void writeData(ObjectDataOutput out) {
}
@Override
public void readData(ObjectDataInput in) {
}
}
Although every data structure supports MergingValue
, which is
the only merge type of AtomicReferenceMergeTypes
, this merge policy is restricted to IAtomicReference
data structures:
The merge policy org.example.merge.AtomicReferenceMergeIntegerValuesMergePolicy
can just be configured on data structures which provide the merging type
com.hazelcast.spi.merge.SplitBrainMergeTypes$AtomicReferenceMergeTypes.
See SplitBrainMergingTypes for supported merging types.
Best Practices
Here are some best practices when implementing your own merge policy
-
Only call
MergingValue.getValue()
andMergingEntry.getKey()
when you really need the deserialized value to save costs (CPU and memory) and avoidClassNotFoundException
. -
If you want to return one of the given values (merging or existing), it’s best to return
mergingValue.getRawValue()
orexistingValue.getRawValue()
, since they are already in the correct in-memory format of the data structure. If you return a deserialized value, it might need to be serialized again, which are avoidable costs. -
Be careful with slow operations in the merge policy (like database accesses), since they block your partition threads. Also, the
LifeCycleEvent.MERGED
orLifeCycleEvent.MERGE_FAILED
may be thrown too early, if the merge operations take too long to finish.
Eliminating Unsuccessful Cluster Merges
In some cases, Hazelcast Platform members which initially can form a cluster (join to each other) may not be able to recover from the split-brain even though the network failure which caused the split-brain is healed. The reason is that the master member of the cluster, which responsible for initiating the healing, fails to discover the other cluster’s master member, so the split-brain merge process never begins.
Considering the scenario when you are scaling-up a cluster. You need to add new members because the initial member count in the cluster is insufficient. The IP addresses of these new members may not exist in the configurations of the initial set of members, since the new addresses are not added into the configurations of the initial members while starting them. So, in a cluster with a long lifecycle, some members may not have the address of every other member, resulting in split-brain recovery failures.
To overcome this issue, you can use the following REST endpoint to update the member list on the cluster members at runtime.
/hazelcast/rest/config/tcp-ip/member-list
You can send a GET
request to the above endpoint to retrieve the current member list from the member and a POST
request for updating the member list.
The following is the cURL command to retrieve the member list, and its response format:
curl http://<member IP address>:<port>/hazelcast/rest/config/tcp-ip/member-list
{"status":"success","member-list":["<member-1-address>","<member-2-address>","<member-3-address>", ...]}
The following is the cURL command to update the member list, and its response format:
curl --data-urlencode "<cluster-name>" --data-urlencode "" --data-urlencode "<member1-address>,<member2-address>,<member3-address>" http://<member IP address>:<port>/hazelcast/rest/config/tcp-ip/member-list
{"status":"success","message":"The member list of TCP-IP join config is updated at run time. ","member-list":["<member1-address>","<member2-address>","<member3-address>"]
If security enabled and custom member authentication is used in your cluster, the cURL command to update the member list would look like the following. curl --data-urlencode "<username>" --data-urlencode "<password>" --data-urlencode "<member1-address>,<member2-address>,<member3-address>" http://<member IP address>:<port>/hazelcast/rest/config/tcp-ip/member-list |
For these requests to work, you need to enable the CLUSTER_READ
and CLUSTER_WRITE
endpoint groups, respectively, in the cluster members. See Enabling Endpoint Groups.
By using this REST endpoint, you are able to add the missing addresses of the members from the other cluster to your members. Therefore, one cluster master member will find the other cluster’s master member.
Deprecation Notice for the REST API
The REST API has been deprecated and will be removed as of Hazelcast version 7.0. An improved version of this feature is under development. |