Advanced Features

Synchronizing WAN Clusters

WAN Replication creates a replica of data structure mutation events as they occur on the source cluster. The events are queued up, collected in a batch and sent to the target cluster to be applied, without any user interaction.

However, Hazelcast clusters connected over WAN may become out-of-sync for various reasons, including but not limited to:

  • Member failures

  • Concurrent updates

  • Target cluster starts with no data

  • Target cluster experiences issues that result in the failure of one or more operations

  • Two sides disconnect and the in-memory buffer of the source cluster becomes full (the behavior in this case is configurable)

  • The WAN link cannot keep up with a burst experienced by the source cluster and its in-memory buffer gets full (the behavior in this case is configurable)

  • Split-brain scenario in source clusters where both brains can still communicate with WAN target clusters

As well as failures such as the examples above, data can become out-of-sync after manual interventions such as cluster restarts or failover of one cluster to another (for example disaster recovery). After any such situation, you are advised to synchronize all WAN replicated data.

To overcome this out-of-sync issue, you can use either of the following approaches to synchronize your WAN replicated clusters:

  • Full synchronization

  • Delta synchronization

These approaches are described in the following sections.

Full WAN Synchronization

Full WAN synchronization sends all the data of an IMap to a target cluster to align the state of target IMap with source IMap. It is useful if two remote clusters lost their synchronizations due to overflow in the WAN queue or in restart scenarios. This is the default synchronization option.

Full WAN Synchronization can be initiated through Management Center and Hazelcast’s REST API.

Deprecation Notice for the Community Edition REST API

The Community Edition REST API has been deprecated and will be removed as of Hazelcast version 7.0. An improved Enterprise version of this feature is available and actively being developed. For more info, see Enterprise REST API

Below is the URL for the REST call;

http://{member IP address:port}/hazelcast/rest/wan/sync/map

You need to add URL-encoded parameters to the request in the following order separated by "&";

  • Cluster name

  • Cluster password

  • Name of the WAN replication configuration

  • WAN replication publisher ID/target cluster name

  • Map name to be synchronized

Assume that you have configured an IMap with a WAN replication configuration as follows:

  • YAML

  • XML

hazelcast:
  wan-replication name: london-wan-rep
    batch-publisher:
      cluster-name: istanbul
  map:
    my-map:
      wan-replication-ref:
        london-wan-rep:
          merge-policy: com.hazelcast.spi.merge.PassThroughMergePolicy
<hazelcast>
    ...
    <wan-replication name="london-wan-rep">
        <batch-publisher>
            <cluster-name>istanbul</cluster-name>
        </batch-publisher>
    </wan-replication>
    <map name="my-map">
        <wan-replication-ref name="london-wan-rep">
            <merge-policy>com.hazelcast.spi.merge.PassThroughMergePolicy</merge-policy>
        </wan-replication-ref>
    </map>
    ...
</hazelcast>

Then, an example curl command to initiate the synchronization for "my-map" would be as follows:

curl -X POST -d "{clusterName}&{clusterPassword}&london-wan-rep&istanbul&my-map" --URL http://127.0.0.1:5701/hazelcast/rest/wan/sync/map

You can also synchronize all maps in the source and target clusters. In that case, the curl command using the above parameters becomes as follows:

curl -X POST -d "{clusterName}&{clusterPassword}&london-wan-rep&istanbul" --URL http://127.0.0.1:5701/hazelcast/rest/wan/sync/allMaps
Synchronization for a target cluster operates only with the data residing in the memory. Therefore, evicted entries are not synchronized, not even if MapLoader is configured.

Delta WAN Synchronization

As explained in the previous section, the default Full WAN Synchronization feature synchronizes the maps in different clusters by transferring all the entries from the source to the target cluster. This may be not efficient since some of the entries have remained unchanged on both clusters and do not require to be transferred. Also, for the entries to be transferred, they need to be copied to on-heap on the source cluster. This may cause spikes in the heap usage, especially if using large off-heap stores.

In addition to the default Full WAN Synchronization, Hazelcast provides Delta WAN Synchronization which uses Merkle tree for the same purpose. It is a data structure used for efficient comparison of the difference in the contents of large data structures. The precision of this comparison is defined by Merkle tree’s depth. Merkle tree hash exchanges can detect inconsistencies in the map data and synchronize only the different entries when using WAN synchronization, instead of sending all the map entries.

Currently, Delta WAN Synchronization is implemented only for Hazelcast IMap. It will be implemented also for ICache in the future releases.

Requirements

To be able to use Delta WAN synchronization, the following must be met:

  • Source and target cluster versions must be at least Hazelcast 4.0.

  • Both clusters must have the same number of partitions.

  • Both clusters must use the same partitioning strategy.

  • Both clusters must have the Merkle tree structure enabled.

Using Delta WAN Synchronization

To be able to use Delta WAN synchronization for a Hazelcast data structure:

1 - Configure the WAN synchronization mechanism for your WAN publisher so that it uses the Merkle tree: If configuring declaratively, you can use the consistency-check-strategy sub-element of the sync element. If configuring programmatically, you can use the setter of the WanSyncConfig object. Here is a declarative example:

  • YAML

  • XML

hazelcast:
  wan-replication:
    wanReplicationScheme:
      batch-publisher:
        cluster-name: clusterName
        sync:
          consistency-check-strategy: MERKLE_TREES
<hazelcast>
    ...
     <wan-replication name="wanReplicationScheme">
        <batch-publisher>
            <cluster-name>clusterName</cluster-name>
            <sync>
                <consistency-check-strategy>MERKLE_TREES</consistency-check-strategy>
            </sync>
        </batch-publisher>
    </wan-replication>
    ...
</hazelcast>

2 - Bind that WAN synchronization configuration to the data structure (currently IMap): Simply set the WAN replication reference of your map to the name of the WAN replication configuration which uses the Merkle tree. Here is a declarative example:

  • YAML

  • XML

hazelcast:
  map:
    myMap:
      wan-replication-ref:
        wanReplicationScheme:
          ...
<hazelcast>
    ...
    <map name="myMap">
        <wan-replication-ref name="wanReplicationScheme">
          ...
        </wan-replication-ref>
    </map>
    ...
</hazelcast>

3 - Finally, configure the Merkle tree using the merkle-tree element which is contained in the map configuration:

  • YAML

  • XML

hazelcast:
  map:
    myMap:
      merkle-tree:
        enabled: true
        depth: 5
<hazelcast>
    ...
    <map name="myMap">
        <merkle-tree enabled="true">
            <depth>5</depth>
        </merkle-tree>
    </map>
    ...
</hazelcast>

You can programmatically configure it, too, using the MerkleTreeConfig object.

Here is the full declarative configuration example showing how to enable Delta WAN Synchronization, bind it to a Hazelcast data structure (an IMap in this case) and specify its depth:

  • YAML

  • XML

hazelcast:
  map:
    myMap:
      wan-replication-ref:
        wanReplicationScheme:
          ...
      merkle-tree:
        enabled: true
        depth: 10
  wan-replication:
    wanReplicationScheme:
      batch-publisher:
        cluster-name: clusterName
        sync:
          consistency-check-strategy: MERKLE_TREES
<hazelcast>
    ...
    <map name="myMap">
        <wan-replication-ref name="wanReplicationScheme">
            ...
        </wan-replication-ref>
        <merkle-tree enabled="true">
            <depth>10</depth>
        </merkle-tree>
    </map>

    <wan-replication name="wanReplicationScheme">
        <batch-publisher>
            <cluster-name>clusterName</cluster-name>
            <sync>
                <consistency-check-strategy>MERKLE_TREES</consistency-check-strategy>
            </sync>
        </batch-publisher>
    </wan-replication>
    ...
</hazelcast>

Here, the element consistency-check-strategy sets the strategy for checking the consistency of data between the source and target clusters. You must initiate the WAN synchronization (via Management Center or REST API as explained in Synchronizing WAN clusters) to let this strategy reconcile the inconsistencies. The element consistency-check-strategy has currently two values:

  • NONE: Means that there are no consistency checks. This is the default value.

  • MERKLE_TREES: Means that WAN synchronization uses Merkle tree structure.

The Merkle tree structure is enabled using its enabled attribute (default is true). Its depth element specifies the depth of Merkle tree. Valid values are between 2 and 27 (exclusive). Its default value is 10.

  • A larger depth means that a data synchronization mechanism is able to pinpoint a smaller subset of the data structure (e.g., IMap) contents in which a change has occurred. This causes the synchronization mechanism to be more efficient. However, keep in mind that a large depth means that the Merkle tree will consume more memory. As the comparison mechanism is iterative, a larger depth also prolongs the comparison duration. Therefore, it is recommended not to have large tree depths if the latency of the comparison operation is high.

  • A smaller depth means that the Merkle tree is shallower and the data synchronization mechanism transfers larger chunks of the data structure (e.g., IMap) in which a possible change has happened. As you can imagine, a shallower Merkle tree will consume less memory.

Also see the Defining the Depth section for more insights.

If you do not specifically configure the merkle-tree in your Hazelcast configuration, Hazelcast uses the default Merkle tree structure values (i.e., it is enabled by default and its default depth is 10) when there is a WAN publisher using the Merkle tree (i.e., consistency-check-strategy for a WAN replication configuration is set as MERKLE_TREES and there is a data structure using that WAN replication configuration).
Merkle trees are created for each partition holding IMap data. Therefore, increasing the partition count also increases the efficiency of the Delta WAN Synchronization.

The Process

Synchronizing the maps based on Merkle trees consists of two phases:

  1. Consistency check: Process of exchanging and comparing the hashes stored in the Merkle tree structures in the source and target clusters. The check starts with the root node and continues recursively with the children with different hash codes. Both sides send the children of the nodes that the other side sent, hence the comparison is done by depth/2 steps. After this check, the tree leaves holding different entries are identified.

  2. Synchronization: Process of transferring the entries belong to the leaves identified by the consistency check from the source to target cluster. On the target cluster the configured merge policy is applied for each entry that is in both the source and target clusters.

If you only need the differences between the clusters, you can trigger the consistency check without performing synchronization.

The two phases of the Merkle tree based synchronization can be triggered by the REST calls, as it can be done with the full synchronization.

The URL for the consistency check REST call:

http://{member IP address:port}/hazelcast/rest/wan/consistencyCheck/map

The URL for the synchronization REST call - the same as it is for the default synchronization:

http://{member IP address:port}/hazelcast/rest/wan/sync/map

See the REST call details here.

Deprecation Notice for the Community Edition REST API

The Community Edition REST API has been deprecated and will be removed as of Hazelcast version 7.0. An improved Enterprise version of this feature is available and actively being developed. For more info, see Enterprise REST API

Memory Consumption

Since Merkle trees are built for each partition and each map, the memory overhead of the trees with high entry count and deep trees can be significant. The trees are maintained on-heap, therefore - besides the memory consumption - garbage collection could be another concern.

The table below shows a few examples for what the memory overhead could be.

Table 1. Merkle trees memory overhead for a member, for one map
Partitions Owned Depth Memory Overhead

271

8

0.27 MB

271

10

1 MB

271

13

8 MB

271

16

68 MB

5009

8

5 MB

5009

10

20 MB

5009

13

157 MB

5009

16

1252 MB

Defining the Depth

The efficiency of the Delta WAN Synchronization (WAN synchronization based on Merkle trees) is determined by the average number of entries per the tree leaves that is proportionate to the number of entries in the map. The bigger this average, the more entries are getting synchronized for the same difference. Raising the depth decreases this average at the cost of increasing the memory overhead.

This average can be calculated for a map as avgEntriesPerLeaf = mapEntryCount / totalLeafCount, where totalLeafCount = partitionCount * 2depth-1. The ideal value is 1, however this may come at significant memory overhead as shown in the table above.

In order to specify the tree depth, a trade-off between memory consumption and effectiveness might be needed.

Even if the map is huge and the Merkle trees are configured to be relatively shallow, the Merkle tree based synchronization may be leveraged if only a small subset of the whole map is expected to be synchronized. The table below illustrates the efficiency of the Merkle tree based synchronization compared to the default synchronization mechanism.

Table 2. Efficiency examples
Map entry count Depth Memory consumption Avg entries / leaf Difference count Entries synced Efficiency

10M

11

39 MB

2

5M

10M

0%

10M

12

78 MB

1

5M

5M

100%

10M

10

20 MB

4

1M

4M

150%

10M

8

5 MB

16

10K

160K

6150%

10M

12

78 MB

1

10K

10K

99900%

The Difference count column shows the number of the entries different in the source and the target clusters. This is the minimum number of the entries that need to be synchronized to make the clusters consistent. The Entries synced column shows how many entries are synchronized in the given case, calculated as Entries synced = Difference count * Avg entries / leaf.

As shown in the last two rows, the Merkle tree based synchronization transfers significantly fewer entries than what the default mechanism does even with 8 deep trees. The efficiency with depth 12 is even better but consumes much more memory.

The averages in the table are calculated with 5009 partitions.
The average entries per leaf number above assumes perfect distribution of the entries amongst the leaves. Since this is typically not true in real-life scenarios the efficiency can be slightly worse. The statistics section below describes how to get the actual average for the leaves involved in the synchronization.

WAN Synchronization Statistics

Both Full and Delta WAN Synchronization processes write statistics into the diagnostics subsystem and send them to Hazelcast Management Center. Using these statistics you can measure the efficiency of your configuration.

Full WAN Synchronization reports the following:

  • Duration of the synchronization

  • Count of the synchronized entries

  • Total count of the synchronized partitions

Here is an example output:

Synchronization statistics:
  Synchronization UUID: 8af2f9e7-3f9f-4c31-b594-47c421bfb33c
  Duration: 0 secs
  Total records synchronized: 448
  Total partitions synchronized: 5

Delta WAN Synchronization reports the following:

  • Duration of the synchronization

  • Count of the synchronized entries

  • Total count of the synchronized partitions

  • Merkle tree nodes checked

  • Merkle tree nodes found to be different

  • Count of the entries needed to be synchronized to make the clusters consistent

  • Average count of entries per tree leaves in the synchronized leaves

Here is an example output:

Merkle synchronization statistics:
  Synchronization UUID: f49a25ba-dc57-4547-817b-bea67ff7f0fe
  Duration: 0 secs
  Total records synchronized: 528
  Total partitions synchronized: 6
  Total Merkle tree nodes synchronized: 178
  Average records per Merkle tree node: 2.97
  StdDev of records per Merkle tree node: 1.55
  Minimum records per Merkle tree node: 1
  Maximum records per Merkle tree node: 7

See the Diagnostics section to learn how to enable diagnostics and locate its log file to see the above statistics.

Dynamically Adding WAN Publishers

When running clusters for an extensive period, you might need to dynamically change the configuration while the cluster is running. This includes dynamically adding new WAN replication publishers (new target clusters) and replicating the subsequent map and cache updates to the new publishers without any manual intervention.

You can add new WAN publishers to an existing WAN replication using almost all the configuration options that are available when configuring the WAN publishers in the static configuration (including using Discovery SPI). The new configuration is not persisted but it is replicated to all existing and new members. Once the cluster is completely restarted, the dynamically added publisher configuration is lost and the updates are not replicated to the target cluster anymore until added again.

To persist dynamic configuration changes, you can configure persistence for dynamic configuration. See Dynamic Configuration for Members

You cannot remove the existing configurations but can put the publishers into a STOPPED state which prevents the WAN events from being enqueued in the WAN queues and prevents the replication, rendering the publisher idle. The configurations also cannot be changed.

You can dynamically add a WAN publisher configuration using the following REST call URL:

http://{memberIPaddress:port}/hazelcast/rest/wan/addWanConfig

You need to add the following URL-encoded parameters to the request in the following order separated by "&";

  • Cluster name

  • Cluster password

  • WAN replication configuration, serialized as JSON

You can, at any point, even when maps and caches are concurrently mutated, add a new WAN publisher to an existing WAN replication configuration. The limitation is that there must be an existing WAN replication configuration but it can be empty, without any publishers (target clusters). For instance, this is an example of an XML configuration to which you can dynamically add new publishers:

  • YAML

  • XML

hazelcast:
  wan-replication:
    myWanReplication:
  map:
    myMap:
      wan-replication-ref:
        myWanReplication:
          merge-policy: com.hazelcast.spi.merge.PassThroughMergePolicy
          republishing-enabled: false
<hazelcast>
    ...
    <wan-replication name="myWanReplication"></wan-replication>
    <map name="my-map">
        <wan-replication-ref name="myWanReplication">
            <merge-policy>com.hazelcast.spi.merge.PassThroughMergePolicy</merge-policy>
            <republishing-enabled>false</republishing-enabled>
       </wan-replication-ref>
    </map>
    ...
</hazelcast>

Note that the map has defined WAN replication but there is no target cluster yet. You can then add the new WAN replication publishers (target clusters) by performing an HTTP POST as shown below:

curl -X POST -d "clusterName&clusterPassword&{...}" --URL http://127.0.0.1:5701/hazelcast/rest/wan/addWanConfig

You can provide the full configuration as JSON as a parameter. Any WAN configuration supported in the XML and programmatic configurations is also supported in this JSON format. Below are some examples of JSON configuration for a WAN publisher using the Discovery SPI and static IP configuration. Here are the integer values for initialPublisherState, queueFullBehavior and consistencyCheckStrategy:

  • initialPublisherState:

    • 0: REPLICATING

    • 1: PAUSED

    • 2: STOPPED

  • queueFullBehavior:

    • 0: DISCARD_AFTER_MUTATION

    • 1: THROW_EXCEPTION

    • 2: THROW_EXCEPTION_ONLY_IF_REPLICATION_ACTIVE

  • consistencyCheckStrategy:

    • 0: NONE

    • 1: MERKLE_TREES

Below is an example using Discovery SPI (AWS configuration):

{
   "name":"wanReplication",
   "batchPublishers":[
      {
         "clusterName":"tokyo",
         "queueCapacity":10000,
         "queueFullBehavior":0,
         "initialPublisherState":0,
         "discovery":{
            "nodeFilterClass":null,
            "discoveryStrategy":[
               {
                  "className":"com.hazelcast.aws.AwsDiscoveryStrategy",
                  "properties":{
                     "security-group-name":"hazelcast",
                     "tag-value":"cluster1",
                     "host-header":"ec2.amazonaws.com",
                     "tag-key":"aws-test-cluster",
                     "secret-key":"my-secret-key",
                     "iam-role":"s3access",
                     "access-key":"my-access-key",
                     "hz-port":"5701-5708",
                     "region":"us-west-1"
                  }
               }
            ]
         }
      }
   ]
}

Below is an example with Discovery SPI (the new AWS configuration)

{
   "name":"wanReplication",
   "batchPublishers":[
      {
         "clusterName":"tokyo",
         "queueCapacity":1000,
         "queueFullBehavior":0,
         "initialPublisherState":0,
         "aws":{
            "enabled":true,
            "usePublicIp":false,
            "properties":{
               "security-group-name":"hazelcast-sg",
               "tag-value":"hz-nodes",
               "host-header":"ec2.amazonaws.com",
               "tag-key":"type",
               "secret-key":"my-secret-key",
               "iam-role":"dummy",
               "access-key":"my-access-key",
               "region":"us-west-1"
            }
         },
         "sync":{
            "consistencyCheckStrategy":0
         }
      }
   ]
}

Below is an example with static IP configuration (with some optional attributes):

{
   "name":"wanReplication",
   "batchPublishers":[
      {
         "clusterName":"tokyo",
         "queueCapacity":1000,
         "queueFullBehavior":0,
         "initialPublisherState":0,
         "responseTimeoutMillis":5000,
         "targetEndpoints":"10.3.5.1:5701, 10.3.5.2:5701",
         "batchMaxDelayMillis":3000,
         "batchSize":50,
         "snapshotEnabled":false,
         "acknowledgeType":1,
         "sync":{
            "consistencyCheckStrategy":0
         }
      }
   ]
}

Below is an XML configuration with two publishers and several (disabled) discovery strategy configurations:

{
   "name":"wanReplication",
   "batchPublishers":[
      {
         "clusterName":"tokyo",
         "queueCapacity":1000,
         "queueFullBehavior":0,
         "initialPublisherState":0,
         "aws":{
            "enabled":true,
            "usePublicIp":false,
            "properties":{
               "security-group-name":"hazelcast-sg",
               "tag-value":"hz-nodes",
               "host-header":"ec2.amazonaws.com",
               "tag-key":"type",
               "secret-key":"my-secret-key",
               "iam-role":"dummy",
               "access-key":"my-access-key",
               "region":"us-west-1"
            }
         },
         "gcp":{
            "enabled":false,
            "usePublicIp":true,
            "properties":{
               "gcp-prop":"gcp-val"
            }
         },
         "azure":{
            "enabled":false,
            "usePublicIp":true,
            "properties":{
               "azure-prop":"azure-val"
            }
         },
         "kubernetes":{
            "enabled":false,
            "usePublicIp":true,
            "properties":{
               "k8s-prop":"k8s-val"
            }
         },
         "eureka":{
            "enabled":false,
            "usePublicIp":true,
            "properties":{
               "eureka-prop":"eureka-val"
            }
         },
         "discovery":{
            "nodeFilterClass":null,
            "discoveryStrategy":[

            ]
         },
         "sync":{
            "consistencyCheckStrategy":0
         }
      },
      {
         "clusterName":"london",
         "queueCapacity":1000,
         "queueFullBehavior":0,
         "initialPublisherState":0,
         "responseTimeoutMillis":5000,
         "targetEndpoints":"10.3.5.1:5701, 10.3.5.2:5701",
         "batchMaxDelayMillis":3000,
         "batchSize":50,
         "snapshotEnabled":false,
         "acknowledgeType":1,
         "aws":{
            "enabled":false,
            "usePublicIp":false
         },
         "gcp":{
            "enabled":false,
            "usePublicIp":false
         },
         "azure":{
            "enabled":false,
            "usePublicIp":false
         },
         "kubernetes":{
            "enabled":false,
            "usePublicIp":false
         },
         "eureka":{
            "enabled":false,
            "usePublicIp":false
         },
         "discovery":{
            "nodeFilterClass":null,
            "discoveryStrategy":[

            ]
         },
         "sync":{
            "consistencyCheckStrategy":1
         }
      }
   ]
}

Event Filtering API

WAN replication allows you to intercept WAN replication events before they are placed to WAN event replication queues by providing a filtering API. Using this API, you can monitor WAN replication events of each data structure separately.

You can attach filters to your data structures using the filter element of wan-replication-ref configuration inside hazelcast.xml as shown below. You can also configure it using the programmatic configuration.

  • YAML

  • XML

hazelcast:
  map:
    testMap:
      wan-replication-ref:
        test:
          filters:
            - com.example.MyFilter
            - com.example.MyFilter2
<hazelcast>
    ...
    <map name="testMap">
        <wan-replication-ref name="test">
            <filters>
                <filter-impl>com.example.MyFilter</filter-impl>
                <filter-impl>com.example.MyFilter2</filter-impl>
            </filters>
        </wan-replication-ref>
    </map>
    ...
</hazelcast>

As shown in the above configuration, you can define more than one filter. Filters are called in the order that they are introduced. A WAN replication event is only eligible to publish if it passes all the filters.

Map and Cache have different filter interfaces: MapWanEventFilter and CacheWanEventFilter. Both of these interfaces have the method filter which takes the following parameters:

  • mapName/cacheName: Name of the related data structure.

  • entryView: EntryView or CacheEntryView depending on the data structure.

  • eventType: Enum type - UPDATED(1), REMOVED(2) or LOADED(3) - depending on the event.

LOADED events are filtered out and not replicated to target cluster.

Implementing a Custom WAN Publisher

In addition to using the Hazelcast’s built-in WAN Replication implementation, you can implement your own replication mechanism using the WAN publisher SPI.

Following is the configuration snippet where replicatedMap and replicatedCache use the custom implementation com.my.WanPublisher to replicate map and cache updates.

  • YAML

  • XML

hazelcast:
  wan-replication:
    london-wan-rep:
      custom-publisher:
        publisher-id: myCustomPublisher
        class-name: com.my.WanPublisher
        properties:
          prop1: val1
          prop2: val2
  map:
    replicatedMap:
      wan-replication-ref:
        london-wan-rep:
          ...
  cache:
    replicatedCache:
      wan-replication-ref:
        london-wan-rep:
          ...
<hazelcast>
    ...
    <wan-replication name="london-wan-rep">
        <custom-publisher>
            <publisher-id>myCustomPublisher</publisher-id>
            <class-name>com.my.WanPublisher</class-name>
            <properties>
                <property name="prop1">val1</property>
                <property name="prop2">val2</property>
            </properties>
        </custom-publisher>
    </wan-replication>

    <map name="replicatedMap">
        <wan-replication-ref name="london-wan-rep"/>
        ...
    </map>

    <cache name="replicatedCache">
        <wan-replication-ref name="london-wan-rep"/>
        ...
    </cache>
    ...
</hazelcast>

The custom-publisher is used to configure a custom implementation of a WAN replication implementing com.hazelcast.wan.WanPublisher. For example, you might implement replication to Kafka or some JMS queue or even write out map and cache event changes to a log on disk. It has the following sub-elements:

  • class-name: Mandatory configuration value defining the fully qualified class name of the WAN publisher implementation. The class must implement com.hazelcast.wan.WanPublisher.

  • publisher-id: Mandatory configuration value for the publisher ID used for identifying the publisher in a WanReplicationConfig. This ID will be used to refer to this specific WAN publisher in a certain WAN replication scheme.

In some cases, specifying the configuration on the source/active cluster is enough to fully implement your use case. This is the case when you don’t have any target/passive Hazelcast cluster which consumes these events. In cases when you do have a target Hazelcast cluster and you wish to use a custom WAN Replication implementation, you will need to configure the target cluster as well. For example, you might want to implement WAN Replication by transmitting WAN events through some JMS queue like ActiveMQ. In this case, you need to implement both your custom WAN publisher and WAN consumer.

Below is a configuration example for specifying a custom WAN replication consumer on the target/passive cluster:

  • YAML

  • XML

hazelcast:
  wan-replication:
    london-wan-rep:
      consumer:
        class-name: com.my.WanConsumer
        properties:
          prop1: val1
          prop2: val2
<hazelcast>
    ...
    <wan-replication name="london-wan-rep">
        <consumer>
            <class-name>com.my.WanConsumer</class-name>
            <properties>
                <property name="prop1">val1</property>
                <property name="prop2">val2</property>
            </properties>
        </consumer>
    </wan-replication>
</hazelcast>

The consumer is used to configure the implementation of the com.hazelcast.wan.WanConsumer interface which will be used to retrieve and process WAN events. A custom WAN consumer allows you to define custom processing logic and is used in combination with a custom WAN publisher.

The consumer configuration element has the following sub-elements:

  • class-name: Name of the class implementing a custom WAN consumer (com.hazelcast.wan.WanConsumer).

  • properties: Properties for the custom WAN consumer. These properties are accessible when initializing the WAN consumer. You can define the host, username and password for the host, name of the queue to be polled by the consumer, etc.

Customizing WAN Event Processing on Passive/Target Cluster

In addition to customizing the behavior of the source cluster and how WAN events are sent and retained, you can also configure some aspects of how WAN events are processed on the receiving (target/passive) cluster. In addition, you can also define a custom implementation of a WAN event consumer. A custom WAN consumer allows you to define custom processing logic and is usually used in combination with a custom WAN publisher. A custom consumer is optional and you may simply omit defining it, which causes the default processing logic to be used. See the Using the WAN Custom Publisher section for more information.

Below you can see an example configuration of the target/passive cluster where we configure how incoming WAN events are processed.

  • YAML

  • XML

hazelcast:
  wan-replication:
    london-wan-rep:
      consumer:
        persist-wan-replicated-data: false
  map:
    replicatedMap:
      wan-replication-ref:
        london-wan-rep:
          ...
  cache:
    replicatedCache:
      wan-replication-ref:
        london-wan-rep:
          ...
<hazelcast>
    ...

    <wan-replication name="london-wan-rep">
        <consumer>
            <persist-wan-replicated-data>false</persist-wan-replicated-data>
        </consumer>
    </wan-replication>

    <map name="replicatedMap">
        <wan-replication-ref name="london-wan-rep"/>
        ...
    </map>

    <cache name="replicatedCache">
        <wan-replication-ref name="london-wan-rep"/>
        ...
    </cache>
    ...
</hazelcast>

In the configuration above, you can see that the WAN Replication configuration is again matched by WAN replication scheme name to the exact map and cache configuration. This means that different structures can process WAN events differently.

The processing behavior is configured using the consumer element. It has the following sub-elements:

  • persist-wan-replicated-data: When set to true, an incoming event over WAN replication can be persisted to a database for example, otherwise it is not persisted. Default value is false.

Securing the Connections for WAN Replication

You can secure the communications between the endpoints on the WAN replicated clusters using security features such as TLS/SSL, socket interceptor and encryption. WAN connections, cluster members and clients can have their own unique security configurations. You can also choose to have security features configured on some of the WAN connections/members/clients and not on the others.

Multiple WAN endpoint configurations can be defined to configure the outgoing connections and server sockets, depending on the role of the member in the WAN replication. The configuration examples are provided in the following sections for both active and passive side of the WAN replication.

Configuring the WAN Active Side

The members on the active cluster initiate connections to the target cluster members, so there is no need to create a server socket. A plain EndpointConfig is created that supplies the configuration for the client side of connections that the active members will create:

        config.getAdvancedNetworkConfig().addWanEndpointConfig(
                new EndpointConfig().setName("tokyo")
                        .setSSLConfig(new SSLConfig()
                                            .setEnabled(true)
                                            .setFactoryClassName("com.hazelcast.examples.MySSLContextFactory")
                                            .setProperty("foo", "bar"))
        );
        WanReplicationConfig wanReplicationConfig = new WanReplicationConfig();
        WanBatchPublisherConfig publisherConfig = new WanBatchPublisherConfig()
        			.setEndpoint("tokyo")
        			.setTargetEndpoints("tokyo.hazelcast.com:8765");
        wanReplicationConfig.addBatchReplicationPublisherConfig(publisherConfig);
        config.addWanReplicationConfig(wanReplicationConfig);

        config.getMapConfig("customers").setWanReplicationRef(
                new WanReplicationRef("replicate-to-tokyo", "com.company.MergePolicy", emptyList(), false)
        );

The following is the equivalent declarative configuration:

  • YAML

  • XML

hazelcast:
  advanced-network:
    enabled: true
    wan-endpoint-config:
      endpoint-tokyo:
        ssl:
          enabled: true
          factory-class-name: com.hazelcast.examples.MySSLContextFactory
          properties:
            endpoints: tokyo.example.com:11010
  wan-replication:
    replicate-to-tokyo:
      batch-publisher:
        cluster-name: clusterB
        target-endpoints: ...
        endpoint: endpoint-tokyo
  map:
    customer:
      wan-replication-ref:
        replicate-to-tokyo:
          merge-policy-class-name: ...
<hazelcast>
    ...
    <advanced-network enabled="true">
        <wan-endpoint-config name="tokyo">
            <ssl enabled="true">
                <factory-class-name>com.hazelcast.examples.MySSLContextFactory</factory-class-name>
                <properties>
                    <property name="endpoints">tokyo.example.com:11010</property>
                </properties>
            </ssl>
        </wan-endpoint-config>
    </advanced-network>
    ...
    <wan-replication name="replicate-to-tokyo">
        <batch-publisher>
            <cluster-name>clusterB</cluster-name>
            <target-endpoints>...</target-endpoints>
            <endpoint>tokyo</endpoint>
        </batch-publisher>
    </wan-replication>
    ...
    <map name="customer">
        <wan-replication-ref name="replicate-to-tokyo">
            <merge-policy>...</merge-policy>
        </wan-replication-ref>
    </map>
    ...
</hazelcast>

The wan-endpoint-config element contains the same sub-elements as the member-server-socket-endpoint-config element described in the Advanced Network Configuration section except port, public-address and reuse-address.

Configuring the WAN Passive Side

On the passive cluster, a server socket is configured on the members to listen for the incoming WAN connections, matching the network configuration (SSL configuration, etc.) configured on the active side of the WAN replication.

        config.getAdvancedNetworkConfig().addWanEndpointConfig(
                new ServerSocketEndpointConfig()
                        .setName("tokyo")
                        .setPort(11010)
                        .setPortAutoIncrement(false)
                        .setSSLConfig(new SSLConfig()
                                .setEnabled(true)
                                .setFactoryClassName("com.hazelcast.examples.MySSLContextFactory")
                                .setProperty("foo", "bar")
                        ));

The following is the equivalent declarative configuration:

  • YAML

  • XML

hazelcast:
  advanced-network:
    enabled: true
    wan-server-socket-endpoint-config:
      tokyo:
        port:
          auto-increment: false
          port: 11010
        ssl:
          enabled: true
          factory-class-name: com.hazelcast.examples.MySSLContextFactory
        properties:
          foo: bar
<hazelcast>
    ...
    <advanced-network enabled="true">
        <wan-server-socket-endpoint-config name="tokyo">
            <port auto-increment="false">11010</port>
            <ssl enabled="true">
                <factory-class-name>com.hazelcast.examples.MySSLContextFactory</factory-class-name>
                <properties>
                    <property name="foo">bar</property>
                </properties>
            </ssl>
        </wan-server-socket-endpoint-config>
    </advanced-network>
    ...
</hazelcast>