Advanced Features
This section provides information on how to do the following:
Synchronizing WAN Clusters
WAN Replication creates a replica of data structure mutation events as they occur on the source cluster. The events are queued up, collected in a batch and sent to the target cluster to be applied, without any user interaction.
However, Hazelcast clusters connected over WAN may become out-of-sync for various reasons, including but not limited to:
-
Member failures
-
Concurrent updates
-
Target cluster starts with no data
-
Target cluster experiences issues that result in the failure of one or more operations
-
Two sides disconnect and the in-memory buffer of the source cluster becomes full (the behavior in this case is configurable)
-
The WAN link cannot keep up with a burst experienced by the source cluster and its in-memory buffer gets full (the behavior in this case is configurable)
-
Split-brain scenario in source clusters where both brains can still communicate with WAN target clusters
As well as failures such as the examples above, data can become out-of-sync after manual interventions such as cluster restarts or failover of one cluster to another (for example disaster recovery). After any such situation, you are advised to synchronize all WAN replicated data.
To overcome this out-of-sync issue, you can use either of the following approaches to synchronize your WAN replicated clusters:
-
Full synchronization
-
Delta synchronization
These approaches are described in the following sections.
Full WAN Synchronization
Full WAN synchronization sends all the data of an IMap to a target cluster to align the state of target IMap with source IMap. It is useful if two remote clusters lost their synchronizations due to overflow in the WAN queue or in restart scenarios. This is the default synchronization option.
Full WAN Synchronization can be initiated through Management Center and Hazelcast’s REST API.
Below is the URL for the REST call;
http://{member IP address:port}/hazelcast/rest/wan/sync/map
You need to add URL-encoded parameters to the request in the following order separated by "&";
-
Cluster name
-
Cluster password
-
Name of the WAN replication configuration
-
WAN replication publisher ID/target cluster name
-
Map name to be synchronized
Assume that you have configured an IMap with a WAN replication configuration as follows:
hazelcast:
wan-replication name: london-wan-rep
batch-publisher:
cluster-name: istanbul
map:
my-map:
wan-replication-ref:
london-wan-rep:
merge-policy: com.hazelcast.spi.merge.PassThroughMergePolicy
<hazelcast>
...
<wan-replication name="london-wan-rep">
<batch-publisher>
<cluster-name>istanbul</cluster-name>
</batch-publisher>
</wan-replication>
<map name="my-map">
<wan-replication-ref name="london-wan-rep">
<merge-policy>com.hazelcast.spi.merge.PassThroughMergePolicy</merge-policy>
</wan-replication-ref>
</map>
...
</hazelcast>
Then, an example curl command to initiate the synchronization for "my-map" would be as follows:
curl -X POST -d "{clusterName}&{clusterPassword}&london-wan-rep&istanbul&my-map" --URL http://127.0.0.1:5701/hazelcast/rest/wan/sync/map
You can also synchronize all maps in the source and target clusters. In that case the curl command using the above parameters becomes as follows:
curl -X POST -d "{clusterName}&{clusterPassword}&london-wan-rep&istanbul" --URL http://127.0.0.1:5701/hazelcast/rest/wan/sync/allMaps
Synchronization for a target cluster operates only with
the data residing in the memory. Therefore, evicted entries are not
synchronized, not even if MapLoader is configured.
|
Delta WAN Synchronization
As explained in the previous section, the default Full WAN Synchronization feature synchronizes the maps in different clusters by transferring all the entries from the source to the target cluster. This may be not efficient since some of the entries have remained unchanged on both clusters and do not require to be transferred. Also, for the entries to be transferred, they need to be copied to on-heap on the source cluster. This may cause spikes in the heap usage, especially if using large off-heap stores.
In addition to the default Full WAN Synchronization, Hazelcast provides Delta WAN Synchronization which uses Merkle tree for the same purpose. It is a data structure used for efficient comparison of the difference in the contents of large data structures. The precision of this comparison is defined by Merkle tree’s depth. Merkle tree hash exchanges can detect inconsistencies in the map data and synchronize only the different entries when using WAN synchronization, instead of sending all the map entries.
Currently, Delta WAN Synchronization is implemented only for Hazelcast IMap. It will be implemented also for ICache in the future releases. |
Requirements
To be able to use Delta WAN synchronization, the following must be met:
-
Source and target cluster versions must be at least Hazelcast 4.0.
-
Both clusters must have the same number of partitions.
-
Both clusters must use the same partitioning strategy.
-
Both clusters must have the Merkle tree structure enabled.
Using Delta WAN Synchronization
To be able to use Delta WAN synchronization for a Hazelcast data structure:
1 - Configure the WAN synchronization mechanism for your WAN publisher so that
it uses the Merkle tree: If configuring declaratively, you can use the consistency-check-strategy
sub-element of
the sync
element. If configuring programmatically, you can use the setter of the
WanSyncConfig object.
Here is a declarative example:
hazelcast:
wan-replication:
wanReplicationScheme:
batch-publisher:
cluster-name: clusterName
sync:
consistency-check-strategy: MERKLE_TREES
<hazelcast>
...
<wan-replication name="wanReplicationScheme">
<batch-publisher>
<cluster-name>clusterName</cluster-name>
<sync>
<consistency-check-strategy>MERKLE_TREES</consistency-check-strategy>
</sync>
</batch-publisher>
</wan-replication>
...
</hazelcast>
2 - Bind that WAN synchronization configuration to the data structure (currently IMap): Simply set the WAN replication reference of your map to the name of the WAN replication configuration which uses the Merkle tree. Here is a declarative example:
hazelcast:
map:
myMap:
wan-replication-ref:
wanReplicationScheme:
...
<hazelcast>
...
<map name="myMap">
<wan-replication-ref name="wanReplicationScheme">
...
</wan-replication-ref>
</map>
...
</hazelcast>
3 - Finally, configure the Merkle tree using the merkle-tree
element which is contained
in the map
configuration:
hazelcast:
map:
myMap:
merkle-tree:
enabled: true
depth: 5
<hazelcast>
...
<map name="myMap">
<merkle-tree enabled="true">
<depth>5</depth>
</merkle-tree>
</map>
...
</hazelcast>
You can programmatically configure it, too, using the MerkleTreeConfig object.
Here is the full declarative configuration example showing how to enable Delta WAN Synchronization, bind it to a Hazelcast data structure (an IMap in this case) and specify its depth:
hazelcast:
map:
myMap:
wan-replication-ref:
wanReplicationScheme:
...
merkle-tree:
enabled: true
depth: 10
wan-replication:
wanReplicationScheme:
batch-publisher:
cluster-name: clusterName
sync:
consistency-check-strategy: MERKLE_TREES
<hazelcast>
...
<map name="myMap">
<wan-replication-ref name="wanReplicationScheme">
...
</wan-replication-ref>
<merkle-tree enabled="true">
<depth>10</depth>
</merkle-tree>
</map>
<wan-replication name="wanReplicationScheme">
<batch-publisher>
<cluster-name>clusterName</cluster-name>
<sync>
<consistency-check-strategy>MERKLE_TREES</consistency-check-strategy>
</sync>
</batch-publisher>
</wan-replication>
...
</hazelcast>
Here, the element consistency-check-strategy
sets the strategy for
checking the consistency of data between the source and target clusters.
You must initiate the WAN synchronization (via Management Center or REST API as explained in
Synchronizing WAN clusters) to let this strategy reconcile the inconsistencies.
The element consistency-check-strategy
has currently two values:
-
NONE
: Means that there are no consistency checks. This is the default value. -
MERKLE_TREES
: Means that WAN synchronization uses Merkle tree structure.
The Merkle tree structure is enabled using its enabled
attribute (default is true
).
Its depth
element specifies the depth of Merkle tree. Valid values are between 2 and 27 (exclusive).
Its default value is 10
.
-
A larger depth means that a data synchronization mechanism is able to pinpoint a smaller subset of the data structure (e.g., IMap) contents in which a change has occurred. This causes the synchronization mechanism to be more efficient. However, keep in mind that a large depth means that the Merkle tree will consume more memory. As the comparison mechanism is iterative, a larger depth also prolongs the comparison duration. Therefore, it is recommended not to have large tree depths if the latency of the comparison operation is high.
-
A smaller depth means that the Merkle tree is shallower and the data synchronization mechanism transfers larger chunks of the data structure (e.g., IMap) in which a possible change has happened. As you can imagine, a shallower Merkle tree will consume less memory.
Also see the Defining the Depth section for more insights.
If you do not specifically configure the merkle-tree in your
Hazelcast configuration, Hazelcast uses the default Merkle tree structure values
(i.e., it is enabled by default and its default depth is 10) when there is a WAN publisher using
the Merkle tree (i.e., consistency-check-strategy for a WAN replication configuration is set as
MERKLE_TREES and there is a data structure using that WAN replication configuration).
|
Merkle trees are created for each partition holding IMap data. Therefore, increasing the partition count also increases the efficiency of the Delta WAN Synchronization. |
The Process
Synchronizing the maps based on Merkle trees consists of two phases:
-
Consistency check: Process of exchanging and comparing the hashes stored in the Merkle tree structures in the source and target clusters. The check starts with the root node and continues recursively with the children with different hash codes. Both sides send the children of the nodes that the other side sent, hence the comparison is done by
depth/2
steps. After this check, the tree leaves holding different entries are identified. -
Synchronization: Process of transferring the entries belong to the leaves identified by the consistency check from the source to target cluster. On the target cluster the configured merge policy is applied for each entry that is in both the source and target clusters.
If you only need the differences between the clusters, you can trigger the consistency check without performing synchronization. |
The two phases of the Merkle tree based synchronization can be triggered by the REST calls, as it can be done with the full synchronization.
The URL for the consistency check REST call:
http://{member IP address:port}/hazelcast/rest/wan/consistencyCheck/map
The URL for the synchronization REST call - the same as it is for the default synchronization:
http://{member IP address:port}/hazelcast/rest/wan/sync/map
See the REST call details here.
Memory Consumption
Since Merkle trees are built for each partition and each map, the memory overhead of the trees with high entry count and deep trees can be significant. The trees are maintained on-heap, therefore - besides the memory consumption - garbage collection could be another concern.
The table below shows a few examples for what the memory overhead could be.
Partitions Owned | Depth | Memory Overhead |
---|---|---|
271 |
8 |
0.27 MB |
271 |
10 |
1 MB |
271 |
13 |
8 MB |
271 |
16 |
68 MB |
5009 |
8 |
5 MB |
5009 |
10 |
20 MB |
5009 |
13 |
157 MB |
5009 |
16 |
1252 MB |
Defining the Depth
The efficiency of the Delta WAN Synchronization (WAN synchronization based on Merkle trees) is determined by the average number of entries per the tree leaves that is proportionate to the number of entries in the map. The bigger this average the more entries are getting synchronized for the same difference. Raising the depth decreases this average at the cost of increasing the memory overhead.
This average can be calculated for a map as avgEntriesPerLeaf = mapEntryCount / totalLeafCount
, where totalLeafCount =
partitionCount * 2depth-1
. The ideal value is 1, however this may come at significant memory overhead as shown in the
table above.
In order to specify the tree depth, a trade-off between memory consumption and effectiveness might be needed.
Even if the map is huge and the Merkle trees are configured to be relatively shallow, the Merkle tree based synchronization may be leveraged if only a small subset of the whole map is expected to be synchronized. The table below illustrates the efficiency of the Merkle tree based synchronization compared to the default synchronization mechanism.
Map entry count | Depth | Memory consumption | Avg entries / leaf | Difference count | Entries synced | Efficiency |
---|---|---|---|---|---|---|
10M |
11 |
39 MB |
2 |
5M |
10M |
0% |
10M |
12 |
78 MB |
1 |
5M |
5M |
100% |
10M |
10 |
20 MB |
4 |
1M |
4M |
150% |
10M |
8 |
5 MB |
16 |
10K |
160K |
6150% |
10M |
12 |
78 MB |
1 |
10K |
10K |
99900% |
The Difference count
column shows the number of the entries different in the source and the target clusters.
This is the minimum number of the entries that need to be synchronized to make the clusters consistent. The Entries synced
column shows how many entries are synchronized in the given case, calculated as Entries synced
= Difference count
* Avg entries / leaf
.
As shown in the last two rows, the Merkle tree based synchronization transfers significantly less entries than what the default mechanism does even with 8 deep trees. The efficiency with depth 12 is even better but consumes much more memory.
The averages in the table are calculated with 5009 partitions. |
The average entries per leaf number above assumes perfect distribution of the entries amongst the leaves. Since this is typically not true in real-life scenarios the efficiency can be slightly worse. The statistics section below describes how to get the actual average for the leaves involved in the synchronization. |
WAN Synchronization Statistics
Both Full and Delta WAN Synchronization processes write statistics into the diagnostics subsystem and send them to Hazelcast Management Center. Using these statistics you can measure the efficiency of your configuration.
Full WAN Synchronization reports the following:
-
Duration of the synchronization
-
Count of the synchronized entries
-
Total count of the synchronized partitions
Here is an example output:
Synchronization statistics:
Synchronization UUID: 8af2f9e7-3f9f-4c31-b594-47c421bfb33c
Duration: 0 secs
Total records synchronized: 448
Total partitions synchronized: 5
Delta WAN Synchronization reports the following:
-
Duration of the synchronization
-
Count of the synchronized entries
-
Total count of the synchronized partitions
-
Merkle tree nodes checked
-
Merkle tree nodes found to be different
-
Count of the entries needed to be synchronized to make the clusters consistent
-
Average count of entries per tree leaves in the synchronized leaves
Here is an example output:
Merkle synchronization statistics:
Synchronization UUID: f49a25ba-dc57-4547-817b-bea67ff7f0fe
Duration: 0 secs
Total records synchronized: 528
Total partitions synchronized: 6
Total Merkle tree nodes synchronized: 178
Average records per Merkle tree node: 2.97
StdDev of records per Merkle tree node: 1.55
Minimum records per Merkle tree node: 1
Maximum records per Merkle tree node: 7
See the Diagnostics section to learn how to enable diagnostics and locate its log file to see the above statistics.
Dynamically Adding WAN Publishers
When running clusters for an extensive period, you might need to dynamically change the configuration while the cluster is running. This includes dynamically adding new WAN replication publishers (new target clusters) and replicating the subsequent map and cache updates to the new publishers without any manual intervention.
You can add new WAN publishers to an existing WAN replication using almost all the configuration options that are available when configuring the WAN publishers in the static configuration (including using Discovery SPI). The new configuration is not persisted but it is replicated to all existing and new members. Once the cluster is completely restarted, the dynamically added publisher configuration is lost and the updates are not replicated to the target cluster anymore until added again.
To persist dynamic configuration changes, you can configure persistence for dynamic configuration. See Dynamic Configuration for Members
You cannot remove the existing configurations but can put the publishers into a STOPPED state which prevents the WAN events from being enqueued in the WAN queues and prevents the replication, rendering the publisher idle. The configurations also cannot be changed.
You can dynamically add a WAN publisher configuration using the following REST call URL:
http://{memberIPaddress:port}/hazelcast/rest/wan/addWanConfig
You need to add the following URL-encoded parameters to the request in the following order separated by "&";
-
Cluster name
-
Cluster password
-
WAN replication configuration, serialized as JSON
You can, at any point, even when maps and caches are concurrently mutated, add a new WAN publisher to an existing WAN replication configuration. The limitation is that there must be an existing WAN replication configuration but it can be empty, without any publishers (target clusters). For instance, this is an example of an XML configuration to which you can dynamically add new publishers:
hazelcast:
wan-replication:
myWanReplication:
map:
myMap:
wan-replication-ref:
myWanReplication:
merge-policy: com.hazelcast.spi.merge.PassThroughMergePolicy
republishing-enabled: false
<hazelcast>
...
<wan-replication name="myWanReplication"></wan-replication>
<map name="my-map">
<wan-replication-ref name="myWanReplication">
<merge-policy>com.hazelcast.spi.merge.PassThroughMergePolicy</merge-policy>
<republishing-enabled>false</republishing-enabled>
</wan-replication-ref>
</map>
...
</hazelcast>
Note that the map has defined WAN replication but there is no target cluster yet. You can then add the new WAN replication publishers (target clusters) by performing an HTTP POST as shown below:
curl -X POST -d "clusterName&clusterPassword&{...}" --URL http://127.0.0.1:5701/hazelcast/rest/wan/addWanConfig
You can provide the full configuration as JSON as a parameter.
Any WAN configuration supported in the XML and programmatic configurations is also supported in this JSON format.
Below are some examples of JSON configuration for a WAN publisher using
the Discovery SPI and static IP configuration. Here are the integer values for initialPublisherState
,
queueFullBehavior
and consistencyCheckStrategy
:
-
initialPublisherState
:-
0: REPLICATING
-
1: PAUSED
-
2: STOPPED
-
-
queueFullBehavior
:-
0: DISCARD_AFTER_MUTATION
-
1: THROW_EXCEPTION
-
2: THROW_EXCEPTION_ONLY_IF_REPLICATION_ACTIVE
-
-
consistencyCheckStrategy
:-
0: NONE
-
1: MERKLE_TREES
-
Below is an example using Discovery SPI (AWS configuration):
{
"name":"wanReplication",
"batchPublishers":[
{
"clusterName":"tokyo",
"queueCapacity":10000,
"queueFullBehavior":0,
"initialPublisherState":0,
"discovery":{
"nodeFilterClass":null,
"discoveryStrategy":[
{
"className":"com.hazelcast.aws.AwsDiscoveryStrategy",
"properties":{
"security-group-name":"hazelcast",
"tag-value":"cluster1",
"host-header":"ec2.amazonaws.com",
"tag-key":"aws-test-cluster",
"secret-key":"my-secret-key",
"iam-role":"s3access",
"access-key":"my-access-key",
"hz-port":"5701-5708",
"region":"us-west-1"
}
}
]
}
}
]
}
Below is an example with Discovery SPI (the new AWS configuration)
{
"name":"wanReplication",
"batchPublishers":[
{
"clusterName":"tokyo",
"queueCapacity":1000,
"queueFullBehavior":0,
"initialPublisherState":0,
"aws":{
"enabled":true,
"usePublicIp":false,
"properties":{
"security-group-name":"hazelcast-sg",
"tag-value":"hz-nodes",
"host-header":"ec2.amazonaws.com",
"tag-key":"type",
"secret-key":"my-secret-key",
"iam-role":"dummy",
"access-key":"my-access-key",
"region":"us-west-1"
}
},
"sync":{
"consistencyCheckStrategy":0
}
}
]
}
Below is an example with static IP configuration (with some optional attributes):
{
"name":"wanReplication",
"batchPublishers":[
{
"clusterName":"tokyo",
"queueCapacity":1000,
"queueFullBehavior":0,
"initialPublisherState":0,
"responseTimeoutMillis":5000,
"targetEndpoints":"10.3.5.1:5701, 10.3.5.2:5701",
"batchMaxDelayMillis":3000,
"batchSize":50,
"snapshotEnabled":false,
"acknowledgeType":1,
"sync":{
"consistencyCheckStrategy":0
}
}
]
}
Below is an XML configuration with two publishers and several (disabled) discovery strategy configurations:
{
"name":"wanReplication",
"batchPublishers":[
{
"clusterName":"tokyo",
"queueCapacity":1000,
"queueFullBehavior":0,
"initialPublisherState":0,
"aws":{
"enabled":true,
"usePublicIp":false,
"properties":{
"security-group-name":"hazelcast-sg",
"tag-value":"hz-nodes",
"host-header":"ec2.amazonaws.com",
"tag-key":"type",
"secret-key":"my-secret-key",
"iam-role":"dummy",
"access-key":"my-access-key",
"region":"us-west-1"
}
},
"gcp":{
"enabled":false,
"usePublicIp":true,
"properties":{
"gcp-prop":"gcp-val"
}
},
"azure":{
"enabled":false,
"usePublicIp":true,
"properties":{
"azure-prop":"azure-val"
}
},
"kubernetes":{
"enabled":false,
"usePublicIp":true,
"properties":{
"k8s-prop":"k8s-val"
}
},
"eureka":{
"enabled":false,
"usePublicIp":true,
"properties":{
"eureka-prop":"eureka-val"
}
},
"discovery":{
"nodeFilterClass":null,
"discoveryStrategy":[
]
},
"sync":{
"consistencyCheckStrategy":0
}
},
{
"clusterName":"london",
"queueCapacity":1000,
"queueFullBehavior":0,
"initialPublisherState":0,
"responseTimeoutMillis":5000,
"targetEndpoints":"10.3.5.1:5701, 10.3.5.2:5701",
"batchMaxDelayMillis":3000,
"batchSize":50,
"snapshotEnabled":false,
"acknowledgeType":1,
"aws":{
"enabled":false,
"usePublicIp":false
},
"gcp":{
"enabled":false,
"usePublicIp":false
},
"azure":{
"enabled":false,
"usePublicIp":false
},
"kubernetes":{
"enabled":false,
"usePublicIp":false
},
"eureka":{
"enabled":false,
"usePublicIp":false
},
"discovery":{
"nodeFilterClass":null,
"discoveryStrategy":[
]
},
"sync":{
"consistencyCheckStrategy":1
}
}
]
}
Event Filtering API
WAN replication allows you to intercept WAN replication events before they are placed to WAN event replication queues by providing a filtering API. Using this API, you can monitor WAN replication events of each data structure separately.
You can attach filters to your data structures using the filter
element of
wan-replication-ref
configuration inside hazelcast.xml
as shown below.
You can also configure it using the programmatic configuration.
hazelcast:
map:
testMap:
wan-replication-ref:
test:
filters:
- com.example.MyFilter
- com.example.MyFilter2
<hazelcast>
...
<map name="testMap">
<wan-replication-ref name="test">
<filters>
<filter-impl>com.example.MyFilter</filter-impl>
<filter-impl>com.example.MyFilter2</filter-impl>
</filters>
</wan-replication-ref>
</map>
...
</hazelcast>
As shown in the above configuration, you can define more than one filter. Filters are called in the order that they are introduced. A WAN replication event is only eligible to publish if it passes all the filters.
Map and Cache have different filter interfaces: MapWanEventFilter
and
CacheWanEventFilter
. Both of these interfaces have the method filter
which takes the following parameters:
-
mapName
/cacheName
: Name of the related data structure. -
entryView
: EntryView or CacheEntryView depending on the data structure. -
eventType
: Enum type -UPDATED(1)
,REMOVED(2)
orLOADED(3)
- depending on the event.
LOADED events are filtered out and not replicated to target cluster.
|
Implementing a Custom WAN Publisher
In addition to using the Hazelcast’s built-in WAN Replication implementation, you can implement your own replication mechanism using the WAN publisher SPI.
Following is the configuration snippet where replicatedMap
and replicatedCache
use the custom implementation
com.my.WanPublisher
to replicate map and cache updates.
hazelcast:
wan-replication:
london-wan-rep:
custom-publisher:
publisher-id: myCustomPublisher
class-name: com.my.WanPublisher
properties:
prop1: val1
prop2: val2
map:
replicatedMap:
wan-replication-ref:
london-wan-rep:
...
cache:
replicatedCache:
wan-replication-ref:
london-wan-rep:
...
<hazelcast>
...
<wan-replication name="london-wan-rep">
<custom-publisher>
<publisher-id>myCustomPublisher</publisher-id>
<class-name>com.my.WanPublisher</class-name>
<properties>
<property name="prop1">val1</property>
<property name="prop2">val2</property>
</properties>
</custom-publisher>
</wan-replication>
<map name="replicatedMap">
<wan-replication-ref name="london-wan-rep"/>
...
</map>
<cache name="replicatedCache">
<wan-replication-ref name="london-wan-rep"/>
...
</cache>
...
</hazelcast>
The custom-publisher
is used to configure a custom implementation of a WAN replication implementing
com.hazelcast.wan.WanPublisher
. For example, you might implement replication to Kafka or some JMS queue or even
write out map and cache event changes to a log on disk. It has the following sub-elements:
-
class-name
: Mandatory configuration value defining the fully qualified class name of the WAN publisher implementation. The class must implementcom.hazelcast.wan.WanPublisher
. -
publisher-id
: Mandatory configuration value for the publisher ID used for identifying the publisher in aWanReplicationConfig
. This ID will be used to refer to this specific WAN publisher in a certain WAN replication scheme.
In some cases, specifying the configuration on the source/active cluster is enough to fully implement your use case. This is the case when you don’t have any target/passive Hazelcast cluster which consumes these events. In cases when you do have a target Hazelcast cluster and you wish to use a custom WAN Replication implementation, you will need to configure the target cluster as well. For example, you might want to implement WAN Replication by transmitting WAN events through some JMS queue like ActiveMQ. In this case, you need to implement both your custom WAN publisher and WAN consumer.
Below is a configuration example for specifying a custom WAN replication consumer on the target/passive cluster:
hazelcast:
wan-replication:
london-wan-rep:
consumer:
class-name: com.my.WanConsumer
properties:
prop1: val1
prop2: val2
<hazelcast>
...
<wan-replication name="london-wan-rep">
<consumer>
<class-name>com.my.WanConsumer</class-name>
<properties>
<property name="prop1">val1</property>
<property name="prop2">val2</property>
</properties>
</consumer>
</wan-replication>
</hazelcast>
The consumer
is used to configure the implementation of the com.hazelcast.wan.WanConsumer
interface
which will be used to retrieve and process WAN events. A custom WAN consumer allows you to
define custom processing logic and is used in combination with a custom WAN publisher.
The consumer
configuration element has the following sub-elements:
-
class-name
: Name of the class implementing a custom WAN consumer (com.hazelcast.wan.WanConsumer
). -
properties
: Properties for the custom WAN consumer. These properties are accessible when initializing the WAN consumer. You can define the host, username and password for the host, name of the queue to be polled by the consumer, etc.
Customizing WAN Event Processing on Passive/Target Cluster
In addition to customizing behavior of the source cluster and how WAN events are sent and retained, you can also configure some aspects of how WAN events are processed on the receiving (target/passive) cluster. In addition, you can also define a custom implementation of a WAN event consumer. A custom WAN consumer allows you to define custom processing logic and is usually used in combination with a custom WAN publisher. A custom consumer is optional and you may simply omit defining it which causes the default processing logic to be used. See the Using the WAN Custom Publisher section for more information.
Below you can see an example configuration of the target/passive cluster where we configure how incoming WAN events are processed.
hazelcast:
wan-replication:
london-wan-rep:
consumer:
persist-wan-replicated-data: false
map:
replicatedMap:
wan-replication-ref:
london-wan-rep:
...
cache:
replicatedCache:
wan-replication-ref:
london-wan-rep:
...
<hazelcast>
...
<wan-replication name="london-wan-rep">
<consumer>
<persist-wan-replicated-data>false</persist-wan-replicated-data>
</consumer>
</wan-replication>
<map name="replicatedMap">
<wan-replication-ref name="london-wan-rep"/>
...
</map>
<cache name="replicatedCache">
<wan-replication-ref name="london-wan-rep"/>
...
</cache>
...
</hazelcast>
In the configuration above you can see that the WAN Replication configuration is again matched by WAN replication scheme name to the exact map and cache configuration. This means that different structures can process WAN events differently.
The processing behavior is configured using the consumer
element. It has the following sub-elements:
-
persist-wan-replicated-data
: When set totrue
, an incoming event over WAN replication can be persisted to a database for example, otherwise it is not persisted. Default value isfalse
.
Securing the Connections for WAN Replication
You can secure the communications between the endpoints on the WAN replicated clusters using security features such as TLS/SSL, socket interceptor and encryption. WAN connections, cluster members and clients can have their own unique security configurations. You can also choose to have security features configured on some of the WAN connections/members/clients and not on the others.
Multiple WAN endpoint configurations can be defined to configure the outgoing connections and server sockets, depending on the role of the member in the WAN replication. The configuration examples are provided in the following sections for both active and passive side of the WAN replication.
Configuring the WAN Active Side
The members on the active cluster initiate connections to the target cluster members,
so there is no need to create a server socket. A plain EndpointConfig
is created that
supplies the configuration for the client side of connections that the active members
will create:
config.getAdvancedNetworkConfig().addWanEndpointConfig(
new EndpointConfig().setName("tokyo")
.setSSLConfig(new SSLConfig()
.setEnabled(true)
.setFactoryClassName("com.hazelcast.examples.MySSLContextFactory")
.setProperty("foo", "bar"))
);
WanReplicationConfig wanReplicationConfig = new WanReplicationConfig();
WanBatchPublisherConfig publisherConfig = new WanBatchPublisherConfig()
.setEndpoint("tokyo")
.setTargetEndpoints("tokyo.hazelcast.com:8765");
wanReplicationConfig.addBatchReplicationPublisherConfig(publisherConfig);
config.addWanReplicationConfig(wanReplicationConfig);
config.getMapConfig("customers").setWanReplicationRef(
new WanReplicationRef("replicate-to-tokyo", "com.company.MergePolicy", emptyList(), false)
);
The following is the equivalent declarative configuration:
hazelcast:
advanced-network:
enabled: true
wan-endpoint-config:
endpoint-tokyo:
ssl:
enabled: true
factory-class-name: com.hazelcast.examples.MySSLContextFactory
properties:
endpoints: tokyo.example.com:11010
wan-replication:
replicate-to-tokyo:
batch-publisher:
cluster-name: clusterB
target-endpoints: ...
endpoint: endpoint-tokyo
map:
customer:
wan-replication-ref:
replicate-to-tokyo:
merge-policy-class-name: ...
<hazelcast>
...
<advanced-network enabled="true">
<wan-endpoint-config name="tokyo">
<ssl enabled="true">
<factory-class-name>com.hazelcast.examples.MySSLContextFactory</factory-class-name>
<properties>
<property name="endpoints">tokyo.example.com:11010</property>
</properties>
</ssl>
</wan-endpoint-config>
</advanced-network>
...
<wan-replication name="replicate-to-tokyo">
<batch-publisher>
<cluster-name>clusterB</cluster-name>
<target-endpoints>...</target-endpoints>
<endpoint>tokyo</endpoint>
</batch-publisher>
</wan-replication>
...
<map name="customer">
<wan-replication-ref name="replicate-to-tokyo">
<merge-policy>...</merge-policy>
</wan-replication-ref>
</map>
...
</hazelcast>
The wan-endpoint-config
element contains the same sub-elements as the
member-server-socket-endpoint-config
element described
in the Advanced Network Configuration section except port
, public-address
and reuse-address
.
Configuring the WAN Passive Side
On the passive cluster, a server socket is configured on the members to listen for the incoming WAN connections, matching the network configuration (SSL configuration, etc.) configured on the active side of the WAN replication.
config.getAdvancedNetworkConfig().addWanEndpointConfig(
new ServerSocketEndpointConfig()
.setName("tokyo")
.setPort(11010)
.setPortAutoIncrement(false)
.setSSLConfig(new SSLConfig()
.setEnabled(true)
.setFactoryClassName("com.hazelcast.examples.MySSLContextFactory")
.setProperty("foo", "bar")
));
The following is the equivalent declarative configuration:
hazelcast:
advanced-network:
enabled: true
wan-server-socket-endpoint-config:
tokyo:
port:
auto-increment: false
port: 11010
ssl:
enabled: true
factory-class-name: com.hazelcast.examples.MySSLContextFactory
properties:
foo: bar
<hazelcast>
...
<advanced-network enabled="true">
<wan-server-socket-endpoint-config name="tokyo">
<port auto-increment="false">11010</port>
<ssl enabled="true">
<factory-class-name>com.hazelcast.examples.MySSLContextFactory</factory-class-name>
<properties>
<property name="foo">bar</property>
</properties>
</ssl>
</wan-server-socket-endpoint-config>
</advanced-network>
...
</hazelcast>