Defining WAN Replication
Hazelcast supports two different operation modes of WAN Replication:
-
Active-Passive: This mode is mostly used for failover scenarios where you want to replicate an active cluster to one or more passive clusters, for the purpose of maintaining a backup.
-
Active-Active: Every cluster is equal, each cluster replicates to all other clusters. This is normally used to connect different clients to different clusters for the sake of the shortest path between client and server.
There are two different ways of defining the WAN replication endpoints:
-
Static endpoints
-
Discovery SPI
You can use at most one of these when defining a single WAN publisher.
Defining WAN Replication Using Static Endpoints
Below is an example of declarative configuration of WAN Replication from New York cluster to target the London cluster:
<hazelcast>
...
<wan-replication name="my-wan-cluster-batch">
<wan-publisher group-name="london">
<class-name>com.hazelcast.enterprise.wan.replication.WanBatchReplication</class-name>
<queue-full-behavior>THROW_EXCEPTION</queue-full-behavior>
<queue-capacity>1000</queue-capacity>
<properties>
<property name="batch.size">500</property>
<property name="batch.max.delay.millis">1000</property>
<property name="snapshot.enabled">false</property>
<property name="response.timeout.millis">60000</property>
<property name="ack.type">ACK_ON_OPERATION_COMPLETE</property>
<property name="endpoints">10.3.5.1:5701, 10.3.5.2:5701</property>
<property name="group.password">london-pass</property>
<property name="discovery.period">20</property>
<property name="executorThreadCount">2</property>
</properties>
</wan-publisher>
<wan-consumer>
<class-name>com.hazelcast.enterprise.wan.replication.YourWanConsumer</class-name>
<persist-wan-replicated-data>false</persist-wan-replicated-data>
<properties>
<property name="host">192.168.2.66</property>
<property name="vpn.name">YOUR_VPN_NAME</property>
<property name="username">admin</property>
<property name="password">YOUR_PASSWORD</property>
<property name="queue.name">Q/hz/clusterA</property>
</properties>
</wan-consumer>
</wan-replication>
...
</hazelcast>
The wan-publisher
element is used to configure the target cluster, i.e., to define how WAN events are sent to a specific endpoint. As mentioned above just before the configuration example, the endpoint can be a different cluster defined by static IP’s or discovered using a cloud discovery mechanism. The wan-publisher
element has the following attributes:
-
group-name
: Specifies the target cluster’s group name for authentication on the target endpoint. It is mandatory to set this attribute. -
publisher-id
: Specifies the publisher ID used for identifying the publisher. Setting this ID may be useful when the target group names are not unique for all of the WAN replication publishers in a single WAN replication scheme. It is optional to set this attribute. If this ID is not specified, thegroup-name
is used as a publisher ID.
And, the following are definitions of the configuration elements within wan-publisher
:
-
name
: Name of your WAN Replication. This name is referenced in IMap or ICache configuration when you add WAN Replication for these data structures (using the elementwan-replication-ref
in the configuration of IMap or ICache). -
class-name
: Name of the class implementation for the WAN replication. -
queue-full-behavior
: Policy to be applied when WAN Replication event queues are full. See the Queue Full Behavior section. -
queue-capacity
: Size of the queue of events. Its default value is 10000. See the Queue Capacity section. -
batch.size
: Maximum size of events that are sent to the target cluster in a single batch. Its default value is 500. See the Batch Size section. -
batch.max.delay.millis
: Maximum amount of time, in milliseconds, to be waited before sending a batch of events in casebatch.size
is not reached. Its default value is 1000 milliseconds. See the Batch Maximum Delay section. -
snapshot.enabled
: When set totrue
, only the latest events (based on key) are selected and sent in a batch. Its default value isfalse
. -
response.timeout.millis
: Time, in milliseconds, to be waited for the acknowledgment of a sent WAN event to target cluster. Its default value is 60000 milliseconds. See the Response Timeout section. -
ack.type
: Acknowledgment type for each target cluster. See the xref:ack-types.adoc[Acknowledgment Types section. -
endpoints
: IP addresses and ports of the cluster members for which the WAN replication is implemented. These endpoints are not necessarily the entire target cluster and WAN does not perform the discovery of other members in the target cluster. It only expects that these IP addresses (or at least some of them) are available. -
group.password
: Configures target cluster’s group password.
The wan-consumer
is used to configure the processing of the WAN events received from a target cluster. You can configure certain behavior when processing the incoming WAN events or even configure your own implementation for a WAN consumer. A custom WAN consumer allows you to define a custom processing logic and is usually used in combination with a custom WAN publisher. A custom consumer is optional and you may simply omit defining it which will cause the default processing logic to be used. It has the following sub-elements:
-
class-name
: Name of the class implementing a custom WAN consumer (WanReplicationConsumer
). If you don’t define a class name, the default processing logic for incoming WAN events is used. -
properties
: Properties for the custom WAN consumer. These properties are accessible when initializing the WAN consumer. You can define the host, username and password for the host, name of the queue to be polled by the consumer, etc. -
persist-wan-replicated-data
: When set totrue
, an incoming event over WAN replication can be persisted to a database for example, otherwise it is not persisted. Default value istrue
.
Other relevant properties are:
-
discovery.period
: Period in seconds in which WAN tries to reestablish connections to failed endpoints. Default is 10 seconds. -
executorThreadCount
: The number of threads that theWanBatchReplication
executor has. The executor is used to send WAN events to the endpoints and ideally you want to have one thread per endpoint. If this property is omitted and you have specified theendpoints
property, this is the case. If necessary you can manually define the number of threads that the executor uses. Once the executor has been initialized there is thread affinity between the discovered endpoints and the executor threads - all events for a single endpoint go through a single executor thread, preserving event order. It is important to determine which number of executor threads is a good value. Failure to do so can lead to performance issues - either contention on a too small number of threads or wasted threads that are not performing any work.
And the following is the equivalent programmatic configuration snippet:
Config config = new Config();
WanReplicationConfig wrConfig = new WanReplicationConfig();
wrConfig.setName("my-wan-cluster-batch");
WanPublisherConfig publisherConfig = new WanPublisherConfig();
publisherConfig.setGroupName("london");
publisherConfig.setClassName("com.hazelcast.enterprise.wan.replication.WanBatchReplication");
publisherConfig.setQueueFullBehavior(WANQueueFullBehavior.THROW_EXCEPTION);
publisherConfig.setQueueCapacity(1000);
Map<String, Comparable> props = publisherConfig.getProperties();
props.put("batch.size", 500);
props.put("batch.max.delay.millis", 1000);
props.put("snapshot.enabled", false);
props.put("response.timeout.millis", 60000);
props.put("ack.type", WanAcknowledgeType.ACK_ON_OPERATION_COMPLETE.toString());
props.put("endpoints", "10.3.5.1:5701,10.3.5.2:5701");
props.put("group.password", "london-pass");
props.put("discovery.period", "20");
props.put("executorThreadCount", "2");
wrConfig.addWanPublisherConfig(publisherConfig);
config.addWanReplicationConfig(wrConfig);
Using this configuration, the cluster running in New York replicates to Tokyo and London. The Tokyo and London clusters should have similar configurations if you want to run in Active-Active mode.
If the New York and London cluster configurations contain the wan-replication
element and the Tokyo cluster does not, it means
New York and London are active endpoints and Tokyo is a passive endpoint.
Defining WAN Replication Using Discovery SPI
In addition to defining target cluster endpoints with static IP addresses, you can configure WAN to work with the discovery SPI and determine the endpoint IP addresses at runtime. This allows you to use WAN with endpoints on various cloud infrastructures (such as Amazon EC2 or GCP Compute) where the IP address is not known in advance. Typically you use a readily available discovery SPI plugin such as Hazelcast AWS EC2 discovery plugin, Hazelcast GCP discovery plugin, or similar. For more advanced cases, you can provide your own discovery SPI implementation with custom logic for determining the WAN target endpoints such as looking up the endpoints in some service registry or even reading the endpoint addresses from a file.
When using the discovery SPI, WAN always connects to the public address of the members returned by the discovery SPI implementation. This is opposite to the cluster membership mechanism using the discovery SPI where a member connects to a different member in the same cluster through its private address. Should you prefer for WAN to use the private address of the discovered member as well, please use the discovery.useEndpointPrivateAddress publisher property (see below).
|
Following is an example of setting up the WAN replication with the EC2 discovery plugin. You must have the Hazelcast AWS EC2 discovery plugin on the classpath.
<hazelcast>
...
<wan-replication name="my-wan-cluster-batch">
<wan-publisher group-name="london">
<class-name>com.hazelcast.enterprise.wan.replication.WanBatchReplication</class-name>
<queue-full-behavior>THROW_EXCEPTION</queue-full-behavior>
<queue-capacity>1000</queue-capacity>
<properties>
<property name="batch.size">500</property>
<property name="batch.max.delay.millis">1000</property>
<property name="snapshot.enabled">false</property>
<property name="response.timeout.millis">60000</property>
<property name="ack.type">ACK_ON_OPERATION_COMPLETE</property>
<property name="group.password">london-pass</property>
<property name="discovery.period">20</property>
<property name="maxEndpoints">5</property>
<property name="executorThreadCount">5</property>
</properties>
<discovery-strategies>
<discovery-strategy enabled="true" class="com.hazelcast.aws.AwsDiscoveryStrategy">
<properties>
<property name="access-key">test-access-key</property>
<property name="secret-key">test-secret-key</property>
<property name="region">test-region</property>
<property name="iam-role">test-iam-role</property>
<property name="host-header">ec2.test-host-header</property>
<property name="security-group-name">test-security-group-name</property>
<property name="tag-key">test-tag-key</property>
<property name="tag-value">test-tag-value</property>
<property name="connection-timeout-seconds">10</property>
<property name="hz-port">5701</property>
</properties>
</discovery-strategy>
</discovery-strategies>
</wan-publisher>
</wan-replication>
...
</hazelcast>
The hz-port
property defines the port or the port range on which the target endpoint is running. The default port range 5701-5708 is used if this property is not defined. This is needed because the Amazon API which the AWS plugin uses does not provide the port on which Hazelcast is running, only the IP address. For some other discovery SPI implementations, this might not be necessary and it might discover the port as well, e.g., by looking up in a service registry.
The other properties are the same as when using the aws
element. In case of EC2 discovery you can configure the WAN replication using the aws
element. You may use either of these, but not both at the same time.
<hazelcast>
...
<wan-replication name="my-wan-cluster-batch">
<wan-publisher group-name="london">
<class-name>com.hazelcast.enterprise.wan.replication.WanBatchReplication</class-name>
<queue-full-behavior>THROW_EXCEPTION</queue-full-behavior>
<queue-capacity>1000</queue-capacity>
<properties>
<property name="batch.size">500</property>
<property name="batch.max.delay.millis">1000</property>
<property name="snapshot.enabled">false</property>
<property name="response.timeout.millis">60000</property>
<property name="ack.type">ACK_ON_OPERATION_COMPLETE</property>
<property name="group.password">london-pass</property>
<property name="discovery.period">20</property>
<property name="discovery.useEndpointPrivateAddress">false</property>
<property name="maxEndpoints">5</property>
<property name="executorThreadCount">5</property>
</properties>
<aws enabled="true">
<access-key>my-access-key</access-key>
<secret-key>my-secret-key</secret-key>
<region>us-west-1</region>
<security-group-name>hazelcast-sg</security-group-name>
<tag-key>type</tag-key>
<tag-value>hz-members</tag-value>
<hz-port>5701</hz-port>
</aws>
</wan-publisher>
</wan-replication>
...
</hazelcast>
See the aws element and the Configuring Client for AWS sections for the descriptions of above AWS configuration elements. The following are the definitions of additional configuration properties:
-
discovery.period
: Period in seconds in which WAN tries to discover new endpoints and reestablish connections to failed endpoints. Default is 10 seconds. -
maxEndpoints
: Maximum number of endpoints that WAN connects to when using a discovery mechanism to define endpoints. Default isInteger.MAX_VALUE
. This property has no effect when static endpoint IPs are defined using theendpoints
property. -
executorThreadCount
: Number of threads that theWanBatchReplication
executor has. The executor is used to send WAN events to the endpoints and ideally you want to have one thread per endpoint. If this property is omitted and you have specified theendpoints
property, this is the case. If, on the other hand, you are using WAN with the discovery SPI and you have not specified this property, the executor is sized to the initial number of discovered endpoints. This can lead to performance issues if the number of endpoints changes in the future - either contention on a too small number of threads or wasted threads that are not performing any work. To prevent this you can manually define the executor thread count. Once the executor has been initialized there is thread affinity between the discovered endpoints and the executor threads - all events for a single endpoint go through a single executor thread, preserving event order. -
discovery.useEndpointPrivateAddress
: Determines whether the WAN connection manager should connect to the endpoint on the private address returned by the discovery SPI. By default this property isfalse
which means the WAN connection manager always uses the public address.
You can also define the WAN publisher with discovery SPI using the programmatic configuration:
Config config = new Config();
WanReplicationConfig wrConfig = new WanReplicationConfig();
wrConfig.setName("my-wan-cluster-batch");
WanPublisherConfig publisherConfig = new WanPublisherConfig();
publisherConfig.setGroupName("london");
publisherConfig.setClassName("com.hazelcast.enterprise.wan.replication.WanBatchReplication");
publisherConfig.setQueueFullBehavior(WANQueueFullBehavior.THROW_EXCEPTION);
publisherConfig.setQueueCapacity(1000);
Map<String, Comparable> props = publisherConfig.getProperties();
props.put("batch.size", 500);
props.put("batch.max.delay.millis", 1000);
props.put("snapshot.enabled", false);
props.put("response.timeout.millis", 60000);
props.put("ack.type", WanAcknowledgeType.ACK_ON_OPERATION_COMPLETE.toString());
props.put("group.password", "london-pass");
props.put("discovery.period", "20");
props.put("executorThreadCount", "2");
DiscoveryConfig discoveryConfig = new DiscoveryConfig();
DiscoveryStrategyConfig discoveryStrategyConfig = new DiscoveryStrategyConfig("com.hazelcast.aws.AwsDiscoveryStrategy");
discoveryStrategyConfig.addProperty("access-key","test-access-key");
discoveryStrategyConfig.addProperty("secret-key","test-secret-key");
discoveryStrategyConfig.addProperty("region","test-region");
discoveryStrategyConfig.addProperty("iam-role","test-iam-role");
discoveryStrategyConfig.addProperty("host-header","ec2.test-host-header");
discoveryStrategyConfig.addProperty("security-group-name","test-security-group-name");
discoveryStrategyConfig.addProperty("tag-key","test-tag-key");
discoveryStrategyConfig.addProperty("tag-value","test-tag-value");
discoveryStrategyConfig.addProperty("hz-port",5702);
discoveryConfig.addDiscoveryStrategyConfig(discoveryStrategyConfig);
publisherConfig.setDiscoveryConfig(discoveryConfig);
wrConfig.addWanPublisherConfig(publisherConfig);
config.addWanReplicationConfig(wrConfig);