User Defined Services
In the case of special/custom needs, you can use Hazelcast’s SPI (Service Provider Interface) module to develop your own distributed data structures and services on top of Hazelcast. Hazelcast SPI is an internal, low-level API which is expected to change in each release except for the patch releases. Your structures and services evolve as the SPI changes.
Currently, you can use the data structures or services you implement, by means of Hazelcast SPI, only on the Hazelcast members. We are in the process of improving the SPI for the clients, so that you will be able to implement SPI elements to be used by the Hazelcast Open Binary Client Protocol. |
Throughout this section, we create an example distributed counter that will be the guide to reveal the Hazelcast SPI usage.
Here is our counter.
public interface Counter{
int inc(int amount);
}
This counter will have the following features: * It is stored in Hazelcast. * Different cluster members can call it. * It is scalable, meaning that the capacity for the number of counters scales with the number of cluster members. * It is highly available, meaning that if a member hosting this counter goes down, a backup will be available on a different member.
All these features are done with the steps below. Each step adds a new functionality to this counter.
-
Create the class.
-
Enable the class.
-
Add properties.
-
Place a remote call.
-
Create the containers.
-
Enable partition migration.
-
Create the backups.
Creating the Service Class
To have the counter as a functioning distributed object, we need a class. This class (named CounterService in the following example code) is the gateway between Hazelcast internals and the counter, allowing us to add features to the counter. The following example code creates the class CounterService
. Its lifecycle is managed by Hazelcast.
CounterService
should implement the interface com.hazelcast.spi.ManagedService
as shown below. The com.hazelcast.spi.ManagedService
source code is here.
CounterService
implements the following methods:
-
init
: This is called whenCounterService
is initialized.NodeEngine
enables access to Hazelcast internals such asHazelcastInstance
andPartitionService
. Also, theProperties
object provides us with the ability to create our own properties. -
shutdown
: This is called whenCounterService
is shutdown. It cleans up the resources. -
reset
: This is called when cluster members face the split-brain issue. This occurs when disconnected members that have created their own cluster are merged back into the main cluster. Services can also implement theSplitBrainHandleService
to indicate that they can take part in the merge process. ForCounterService
we are going to implementreset
as a no-op.
public class CounterService implements ManagedService {
private NodeEngine nodeEngine;
@Override
public void init(NodeEngine nodeEngine, Properties properties) {
System.out.println("CounterService.init()");
this.nodeEngine = nodeEngine;
}
@Override
public void shutdown(boolean terminate) {
System.out.println("CounterService.shutdown()");
}
@Override
public void reset() {
}
public NodeEngine getNodeEngine() {
return nodeEngine;
}
}
Enabling the Service Class
Now, we need to enable the class CounterService
. The declarative way of doing this is shown below.
<hazelcast>
...
<network>
<join><multicast enabled="true"/> </join>
</network>
<services>
<service enabled="true">
<name>CounterService</name>
<class-name>CounterService</class-name>
</service>
</services>
...
</hazelcast>
The CounterService
is declared within the services
configuration element.
-
Set the
enabled
attribute totrue
to enable the service. -
Set the
name
attribute to the name of the service. It should be a unique name (CounterService
in our case) since it is looked up when a remote call is made. Note that the value of this attribute is sent at each request and a longername
value means more data (de)serialization. A good practice is to give an understandable name with the shortest possible length. -
Set the
class-name
attribute to the class name of the service (CounterService
in our case). The class should have a no-arg constructor. Otherwise, the object cannot be initialized.
Note that multicast is enabled as the join mechanism. In the later sections for the CounterService
example, we will see why.
Adding Properties to the Service
The init
method for CounterService
takes the Properties
object as an argument. This means we can add properties to the service that are passed to the init
method; see Creating the Service Class. You can add properties declaratively as shown below. (You likely want to name your properties something other than someproperty.)
<hazelcast>
...
<service enabled="true">
<name>CounterService</name>
<class-name>CounterService</class-name>
<properties>
<someproperty>10</someproperty>
</properties>
</service>
...
</hazelcast>
If you want to parse a more complex XML, you can use the interface com.hazelcast.spi.ServiceConfigurationParser
. It gives you access to the XML DOM tree.
Starting the Service
Now, let’s start a HazelcastInstance
as shown below, which starts the CounterService
.
public class Member {
public static void main(String[] args) {
Hazelcast.newHazelcastInstance();
Hazelcast.shutdownAll();
}
}
Once it starts, the CounterService init
method prints the following output.
CounterService.init
Once the HazelcastInstance is shutdown (for example, with Ctrl+C), the CounterService shutdown
method prints the following output.
CounterService.shutdown
Placing a Remote Call via Proxy
In the previous sections for the CounterService
example, we started CounterService
as part of a HazelcastInstance startup.
Now, let’s connect the Counter
interface to CounterService
and perform a remote call to the cluster member hosting the counter data. Then, we return a dummy result.
Remote calls are performed via a proxy in Hazelcast. Proxies expose the methods on the client side. Once a method is called, proxy creates an operation object, sends this object to the cluster member responsible from executing that operation and then sends the result.
Making Counter a Distributed Object
First, we need to make the Counter
interface a distributed object by extending the DistributedObject
interface, as shown below.
public interface Counter extends DistributedObject {
int inc(int amount);
}
Implementing ManagedService and RemoteService
Now, we need to make the CounterService
class implement not only the ManagedService
interface, but also the interface com.hazelcast.spi.RemoteService
. This way, a client can get a handle of a counter proxy. You can check the source code for RemoteService the here.
public class CounterService implements ManagedService, RemoteService {
static final String NAME = "CounterService";
Container[] containers;
private NodeEngine nodeEngine;
@Override
public DistributedObject createDistributedObject(String objectName) {
return new CounterProxy(objectName, nodeEngine, this);
}
@Override
public void destroyDistributedObject(String objectName) {
// for the time being a no-op
}
@Override
public void init(NodeEngine nodeEngine, Properties properties) {
this.nodeEngine = nodeEngine;
}
@Override
public void shutdown(boolean terminate) {
}
@Override
public void reset() {
}
}
The CounterProxy
returned by the method createDistributedObject
is a local representation to (potentially) remote managed data and logic.
Note that caching and removing the proxy instance are done outside of this service. |
Implementing CounterProxy
Now, it is time to implement the CounterProxy
as shown below. CounterProxy
extends AbstractDistributedObject
, source code is here.
public class CounterProxy extends AbstractDistributedObject<CounterService> implements Counter {
private final String name;
CounterProxy(String name, NodeEngine nodeEngine, CounterService counterService) {
super(nodeEngine, counterService);
this.name = name;
}
@Override
public String getServiceName() {
return CounterService.NAME;
}
@Override
public String getName() {
return name;
}
@Override
public int inc(int amount) {
NodeEngine nodeEngine = getNodeEngine();
IncOperation operation = new IncOperation(name, amount);
int partitionId = nodeEngine.getPartitionService().getPartitionId(name);
InvocationBuilder builder = nodeEngine.getOperationService()
.createInvocationBuilder(CounterService.NAME, operation, partitionId);
try {
Future<Integer> future = builder.invoke();
return future.get();
} catch (Exception e) {
throw ExceptionUtil.rethrow(e);
}
}
}
CounterProxy
is a local representation of remote data/functionality. It does not include the counter state. Therefore, the method inc
should be invoked on the cluster member hosting the real counter. You can invoke it using Hazelcast SPI; then it sends the operations to the correct member and return the results.
Let’s dig deeper into the method inc
.
-
First, we create
IncOperation
with a givenname
andamount
. -
Then, we get the partition ID based on the
name
; by this way, all operations for a given name result in the same partition ID. -
Then, we create an
InvocationBuilder
where the connection between operation and partition is made. -
Finally, we invoke the
InvocationBuilder
and wait for its result. This waiting is performed with afuture.get()
. In our case, timeout is not important. However, it is a good practice to use a timeout for a real system since operations should complete in a certain amount of time.
Dealing with Exceptions
Hazelcast’s ExceptionUtil
is a good solution when it comes to dealing with execution exceptions. When the execution of the operation fails with an exception, an ExecutionException
is thrown and handled with the method ExceptionUtil.rethrow(Throwable)
.
If it is an InterruptedException
, we have two options: either propagate the exception or just use the ExceptionUtil.rethrow
for all exceptions. See the example code below.
try {
final Future<Integer> future = invocation.invoke();
return future.get();
} catch(InterruptedException e){
throw e;
} catch(Exception e){
throw ExceptionUtil.rethrow(e);
}
Implementing the PartitionAwareOperation Interface
Now, let’s write the IncOperation
. It implements the PartitionAwareOperation
interface, meaning that it will be executed on the partition that hosts the counter. See the PartitionAwareOperation
source code here.
The method run
does the actual execution. Since IncOperation
returns a response, the method returnsResponse
returns true
. If your method is asynchronous and does not need to return a response, it is better to return false
since it will be faster. The actual response is stored in the field returnValue
; retrieve it with the method getResponse
.
There are two more methods in this code: writeInternal
and readInternal
. Since IncOperation
needs to be serialized, these two methods are overridden, and hence, objectId
and amount
are serialized and available when those operations are executed.
For the deserialization, note that the operation must have a no-arg constructor.
class IncOperation extends Operation implements PartitionAwareOperation {
private String objectId;
private int amount;
private int returnValue;
public IncOperation() {
}
IncOperation(String objectId, int amount) {
this.amount = amount;
this.objectId = objectId;
}
@Override
public void run() throws Exception {
System.out.println("Executing " + objectId + ".inc() on: " + getNodeEngine().getThisAddress());
returnValue = 0;
}
@Override
public Object getResponse() {
return returnValue;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
out.writeUTF(objectId);
out.writeInt(amount);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
objectId = in.readUTF();
amount = in.readInt();
}
}
Running the Code
Now, let’s run our code.
HazelcastInstance[] instances = new HazelcastInstance[2];
for (int i = 0; i < instances.length; i++) {
instances[i] = Hazelcast.newHazelcastInstance();
}
Counter[] counters = new Counter[4];
for (int i = 0; i < counters.length; i++) {
counters[i] = instances[0].getDistributedObject(CounterService.NAME, i + "counter");
}
for (Counter counter : counters) {
System.out.println(counter.inc(1));
}
System.out.println("Finished");
Hazelcast.shutdownAll();
The output is similar to the following:
Executing 0counter.inc() on: Address[192.168.1.103]:5702
0
Executing 1counter.inc() on: Address[192.168.1.103]:5702
0
Executing 2counter.inc() on: Address[192.168.1.103]:5701
0
Executing 3counter.inc() on: Address[192.168.1.103]:5701
0
Finished
Note that counters are stored in different cluster members. Also note that increment is not active for now since the value remains as 0.
Until now, we have performed the basics to get this up and running. In the next section, we will make a real counter, cache the proxy instances and deal with proxy instance destruction.
Creating Containers
Let’s create a Container for every partition in the system. This container contains all counters and proxies.
class Container {
private final Map<String, Integer> values = new HashMap();
int inc(String id, int amount) {
Integer counter = values.get(id);
if (counter == null) {
counter = 0;
}
counter += amount;
values.put(id, counter);
return counter;
}
public void init(String objectName) {
values.put(objectName,0);
}
public void destroy(String objectName) {
values.remove(objectName);
}
}
Hazelcast guarantees that a single thread is active in a single partition. Therefore, when accessing a container, concurrency control is not an issue.
The code in our example uses a Container
instance per partition approach. With this approach, there will not be any mutable shared state between partitions. This approach also makes operations on partitions simpler since you do not need to filter out data that does not belong to a certain partition.
The code performs the following tasks:
-
Creates a container for every partition with the method
init
. -
Creates the proxy with the method
createDistributedObject
. -
Removes the value of the object with the method
destroyDistributedObject
, otherwise we may get an OutOfMemory exception.
Integrating the Container in the CounterService
Let’s integrate the Container
in the CounterService
, as shown below.
public class CounterService implements ManagedService, RemoteService {
public final static String NAME = "CounterService";
Container[] containers;
private NodeEngine nodeEngine;
@Override
public void init(NodeEngine nodeEngine, Properties properties) {
this.nodeEngine = nodeEngine;
containers = new Container[nodeEngine.getPartitionService().getPartitionCount()];
for (int k = 0; k < containers.length; k++)
containers[k] = new Container();
}
@Override
public void shutdown(boolean terminate) {
}
@Override
public CounterProxy createDistributedObject(String objectName) {
int partitionId = nodeEngine.getPartitionService().getPartitionId(objectName);
Container container = containers[partitionId];
container.init(objectName);
return new CounterProxy(objectName, nodeEngine, this);
}
@Override
public void destroyDistributedObject(String objectName) {
int partitionId = nodeEngine.getPartitionService().getPartitionId(objectName);
Container container = containers[partitionId];
container.destroy(objectName);
}
@Override
public void reset() {
}
public static class Container {
final Map<String, Integer> values = new HashMap<String, Integer>();
private void init(String objectName) {
values.put(objectName, 0);
}
private void destroy(String objectName){
values.remove(objectName);
}
}
}
Connecting the IncOperation.run Method to the Container
As the last step in creating a Container, we connect the method IncOperation.run
to the Container, as shown below.
partitionId
has a range between 0 and partitionCount and can be used as an index for the container array. Therefore, you can use partitionId
to retrieve the container and once the container has been retrieved, you can access the value.
class IncOperation extends Operation implements PartitionAwareOperation {
private String objectId;
private int amount, returnValue;
public IncOperation() {
}
public IncOperation(String objectId, int amount) {
this.amount = amount;
this.objectId = objectId;
}
@Override
public void run() throws Exception {
System.out.println("Executing " + objectId + ".inc() on: " + getNodeEngine().getThisAddress());
CounterService service = getService();
CounterService.Container container = service.containers[getPartitionId()];
Map<String, Integer> valuesMap = container.values;
Integer counter = valuesMap.get(objectId);
counter += amount;
valuesMap.put(objectId, counter);
returnValue = counter;
}
@Override
public Object getResponse() {
return returnValue;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
out.writeUTF(objectId);
out.writeInt(amount);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
objectId = in.readUTF();
amount = in.readInt();
}
}
Running the Example Code
Let’s run the following example code.
public class Member {
public static void main(String[] args) {
HazelcastInstance[] instances = new HazelcastInstance[2];
for (int k = 0; k < instances.length; k++)
instances[k] = Hazelcast.newHazelcastInstance();
Counter[] counters = new Counter[4];
for (int k = 0; k < counters.length; k++)
counters[k] = instances[0].getDistributedObject(CounterService.NAME, k+"counter");
System.out.println("Round 1");
for (Counter counter: counters)
System.out.println(counter.inc(1));
System.out.println("Round 2");
for (Counter counter: counters)
System.out.println(counter.inc(1));
System.out.println("Finished");
System.exit(0);
}
}
The output is as follows. It indicates that we have now a basic distributed counter up and running.
Round 1
Executing 0counter.inc() on: Address[192.168.1.103]:5702
1
Executing 1counter.inc() on: Address[192.168.1.103]:5702
1
Executing 2counter.inc() on: Address[192.168.1.103]:5701
1
Executing 3counter.inc() on: Address[192.168.1.103]:5701
1
Round 2
Executing 0counter.inc() on: Address[192.168.1.103]:5702
2
Executing 1counter.inc() on: Address[192.168.1.103]:5702
2
Executing 2counter.inc() on: Address[192.168.1.103]:5701
2
Executing 3counter.inc() on: Address[192.168.1.103]:5701
2
Finished
Partition Migration
In the previous section, we created a real distributed counter. Now, we need to make sure that the content of the partition containers is migrated to different cluster members when a member joins or leaves the cluster. To make this happen, first we need to add three new methods (applyMigrationData
, toMigrationData
and clear
) to the Container
as explained below:
-
toMigrationData
: This method is called when Hazelcast wants to start the partition migration from the member owning the partition. The result of thetoMigrationData
method is the partition data in a form that can be serialized to another member. -
applyMigrationData
: This method is called whenmigrationData
(created by thetoMigrationData
object) is applied to the member that will be the new partition owner. -
clear
: This method is called when the partition migration is successfully completed and the old partition owner gets rid of all data in the partition. This method is also called when the partition migration operation fails and the to-be-the-new partition owner needs to roll back its changes.
class Container {
private final Map<String, Integer> values = new HashMap();
int inc(String id, int amount) {
Integer counter = values.get(id);
if (counter == null) {
counter = 0;
}
counter += amount;
values.put(id, counter);
return counter;
}
void clear() {
values.clear();
}
void applyMigrationData(Map<String, Integer> migrationData) {
values.putAll(migrationData);
}
Map<String, Integer> toMigrationData() {
return new HashMap(values);
}
public void init(String objectName) {
values.put(objectName,0);
}
public void destroy(String objectName) {
values.remove(objectName);
}
}
Transferring migrationData
After you add these three methods to the Container
, you need to create a CounterMigrationOperation
class that transfers migrationData
from one member to another and calls the method applyMigrationData
on the correct partition of the new partition owner.
An example is shown below.
public class CounterMigrationOperation extends Operation {
Map<String, Integer> migrationData;
public CounterMigrationOperation() {
}
public CounterMigrationOperation(Map<String, Integer> migrationData) {
this.migrationData = migrationData;
}
@Override
public void run() throws Exception {
CounterService service = getService();
Container container = service.containers[getPartitionId()];
container.applyMigrationData(migrationData);
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
out.writeInt(migrationData.size());
for (Map.Entry<String, Integer> entry : migrationData.entrySet()) {
out.writeUTF(entry.getKey());
out.writeInt(entry.getValue());
}
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
int size = in.readInt();
migrationData = new HashMap<String, Integer>();
for (int i = 0; i < size; i++)
migrationData.put(in.readUTF(), in.readInt());
}
}
During a partition migration, no other operations are executed on the related partition. |
Letting Hazelcast Know CounterService Can Do Partition Migrations
We need to make our CounterService
class implement the MigrationAwareService
interface, so that Hazelcast knows that the CounterService
can perform partition migrations.
With the MigrationAwareService
interface, some additional methods are exposed. For example, the method prepareMigrationOperation
returns all the data of the partition that is going to be moved. You can check the MigrationAwareService
source code here.
The method commitMigration
commits the data, meaning that in this case, it clears the partition container of the old owner.
public class CounterService implements ManagedService, RemoteService, MigrationAwareService {
public final static String NAME = "CounterService";
Container[] containers;
private NodeEngine nodeEngine;
@Override
public void init(NodeEngine nodeEngine, Properties properties) {
this.nodeEngine = nodeEngine;
containers = new Container[nodeEngine.getPartitionService().getPartitionCount()];
for (int k = 0; k < containers.length; k++)
containers[k] = new Container();
}
@Override
public void shutdown(boolean terminate) {
}
@Override
public DistributedObject createDistributedObject(String objectName) {
int partitionId = nodeEngine.getPartitionService().getPartitionId(objectName);
Container container = containers[partitionId];
container.init(objectName);
return new CounterProxy(objectName, nodeEngine,this);
}
@Override
public void destroyDistributedObject(String objectName) {
int partitionId = nodeEngine.getPartitionService().getPartitionId(objectName);
Container container = containers[partitionId];
container.destroy(objectName);
}
@Override
public void beforeMigration(PartitionMigrationEvent e) {
//no-op
}
@Override
public void clearPartitionReplica(int partitionId) {
Container container = containers[partitionId];
container.clear();
}
@Override
public Operation prepareReplicationOperation(PartitionReplicationEvent e) {
if (e.getReplicaIndex() > 1) {
return null;
}
Container container = containers[e.getPartitionId()];
Map<String, Integer> data = container.toMigrationData();
return data.isEmpty() ? null : new CounterMigrationOperation(data);
}
@Override
public void commitMigration(PartitionMigrationEvent e) {
if (e.getMigrationEndpoint() == MigrationEndpoint.SOURCE) {
Container c = containers[e.getPartitionId()];
c.clear();
}
//todo
}
@Override
public void rollbackMigration(PartitionMigrationEvent e) {
if (e.getMigrationEndpoint() == MigrationEndpoint.DESTINATION) {
Container c = containers[e.getPartitionId()];
c.clear();
}
}
@Override
public void reset() {
}
}
Running the Example Code
We can run the following code.
public class Member {
public static void main(String[] args) throws Exception {
HazelcastInstance[] instances = new HazelcastInstance[3];
for (int k = 0; k < instances.length; k++)
instances[k] = Hazelcast.newHazelcastInstance();
Counter[] counters = new Counter[4];
for (int k = 0; k < counters.length; k++)
counters[k] = instances[0].getDistributedObject(CounterService.NAME, k + "counter");
for (Counter counter : counters)
System.out.println(counter.inc(1));
Thread.sleep(10000);
System.out.println("Creating new members");
for (int k = 0; k < 3; k++) {
Hazelcast.newHazelcastInstance();
}
Thread.sleep(10000);
for (Counter counter : counters)
System.out.println(counter.inc(1));
System.out.println("Finished");
System.exit(0);
}
}
And we get the following output.
Executing 0counter.inc() on: Address[192.168.1.103]:5702
Executing backup 0counter.inc() on: Address[192.168.1.103]:5703
1
Executing 1counter.inc() on: Address[192.168.1.103]:5703
Executing backup 1counter.inc() on: Address[192.168.1.103]:5701
1
Executing 2counter.inc() on: Address[192.168.1.103]:5701
Executing backup 2counter.inc() on: Address[192.168.1.103]:5703
1
Executing 3counter.inc() on: Address[192.168.1.103]:5701
Executing backup 3counter.inc() on: Address[192.168.1.103]:5703
1
Creating new members
Executing 0counter.inc() on: Address[192.168.1.103]:5705
Executing backup 0counter.inc() on: Address[192.168.1.103]:5703
2
Executing 1counter.inc() on: Address[192.168.1.103]:5703
Executing backup 1counter.inc() on: Address[192.168.1.103]:5704
2
Executing 2counter.inc() on: Address[192.168.1.103]:5705
Executing backup 2counter.inc() on: Address[192.168.1.103]:5704
2
Executing 3counter.inc() on: Address[192.168.1.103]:5704
Executing backup 3counter.inc() on: Address[192.168.1.103]:5705
2
Finished
You can see that the counters have moved. 0counter
moved from 192.168.1.103:5702 to 192.168.1.103:5705 and it is incremented correctly. Our counters can now move around in the cluster: they will be redistributed once you add or remove a cluster member.
Creating Backups
Finally, we make sure that the counter data is available on another member when a member goes down. To do this, have the IncOperation
class implement the BackupAwareOperation
interface contained in the SPI package. See the following code.
class IncOperation extends Operation
implements PartitionAwareOperation, BackupAwareOperation {
...
@Override
public int getAsyncBackupCount() {
return 0;
}
@Override
public int getSyncBackupCount() {
return 1;
}
@Override
public boolean shouldBackup() {
return true;
}
@Override
public Operation getBackupOperation() {
return new IncBackupOperation(objectId, amount);
}
}
The methods getAsyncBackupCount
and getSyncBackupCount
specify the count for asynchronous and synchronous backups. Our example has one synchronous backup and no asynchronous backups. In the above code, counts of the backups are hard-coded, but they can also be passed to IncOperation
as parameters.
The method shouldBackup
specifies whether our Operation needs a backup or not. For our example, it returns true
, meaning the Operation always has a backup even if there are no changes. Of course, in real systems, we want to have backups if there is a change. For IncOperation
for example, having a backup when amount
is null would be a good practice.
The method getBackupOperation
returns the operation (IncBackupOperation
) that actually performs the backup creation; the backup itself is an operation and runs on the same infrastructure.
If a backup should be made and getSyncBackupCount
returns 3, then three IncBackupOperation
instances are created and sent to the three machines containing the backup partition. If fewer machines are available, then backups need to be created. Hazelcast will just send a smaller number of operations.
Performing the Backup with IncBackupOperation
Now, let’s have a look at the IncBackupOperation
. It implements BackupOperation
, you can see the source code for BackupOperation
here.
public class IncBackupOperation
extends Operation implements BackupOperation {
private String objectId;
private int amount;
public IncBackupOperation() {
}
public IncBackupOperation(String objectId, int amount) {
this.amount = amount;
this.objectId = objectId;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
out.writeUTF(objectId);
out.writeInt(amount);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
objectId = in.readUTF();
amount = in.readInt();
}
@Override
public void run() throws Exception {
CounterService service = getService();
System.out.println("Executing backup " + objectId + ".inc() on: "
+ getNodeEngine().getThisAddress());
Container c = service.containers[getPartitionId()];
c.inc(objectId, amount);
}
}
Hazelcast also makes sure that a new IncOperation for that particular key will not be executed before the (synchronous) backup operation has completed.
|
Running the Example Code
Let’s see the backup functionality in action with the following code.
public class Member {
public static void main(String[] args) throws Exception {
HazelcastInstance[] instances = new HazelcastInstance[2];
for (int k = 0; k < instances.length; k++)
instances[k] = Hazelcast.newHazelcastInstance();
Counter counter = instances[0].getDistributedObject(CounterService.NAME, "counter");
counter.inc(1);
System.out.println("Finished");
System.exit(0);
}
}
The output is as follows:
Executing counter0.inc() on: Address[192.168.1.103]:5702
Executing backup counter0.inc() on: Address[192.168.1.103]:5701
Finished
As it can be seen, both IncOperation
and IncBackupOperation
are executed. Notice that these operations have been executed on different cluster members to guarantee high availability.