User Defined Services

In the case of special/custom needs, you can use Hazelcast’s SPI (Service Provider Interface) module to develop your own distributed data structures and services on top of Hazelcast. Hazelcast SPI is an internal, low-level API which is expected to change in each release except for the patch releases. Your structures and services evolve as the SPI changes.

Currently, you can use the data structures or services you implement, by means of Hazelcast SPI, only on the Hazelcast members. We are in the process of improving the SPI for the clients, so that you will be able to implement SPI elements to be used by the Hazelcast Open Binary Client Protocol.

Throughout this section, we create an example distributed counter that will be the guide to reveal the Hazelcast SPI usage.

Here is our counter.

public interface Counter{
    int inc(int amount);
}

This counter will have the following features: * It is stored in Hazelcast. * Different cluster members can call it. * It is scalable, meaning that the capacity for the number of counters scales with the number of cluster members. * It is highly available, meaning that if a member hosting this counter goes down, a backup will be available on a different member.

All these features are done with the steps below. Each step adds a new functionality to this counter.

  1. Create the class.

  2. Enable the class.

  3. Add properties.

  4. Place a remote call.

  5. Create the containers.

  6. Enable partition migration.

  7. Create the backups.

Creating the Service Class

To have the counter as a functioning distributed object, we need a class. This class (named CounterService in the following example code) is the gateway between Hazelcast internals and the counter, allowing us to add features to the counter. The following example code creates the class CounterService. Its lifecycle is managed by Hazelcast.

CounterService should implement the interface com.hazelcast.spi.ManagedService as shown below. The com.hazelcast.spi.ManagedService source code is here.

CounterService implements the following methods:

  • init: This is called when CounterService is initialized. NodeEngine enables access to Hazelcast internals such as HazelcastInstance and PartitionService. Also, the Properties object provides us with the ability to create our own properties.

  • shutdown: This is called when CounterService is shutdown. It cleans up the resources.

  • reset: This is called when cluster members face the split-brain issue. This occurs when disconnected members that have created their own cluster are merged back into the main cluster. Services can also implement the SplitBrainHandleService to indicate that they can take part in the merge process. For CounterService we are going to implement reset as a no-op.

public class CounterService implements ManagedService {

    private NodeEngine nodeEngine;

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        System.out.println("CounterService.init()");
        this.nodeEngine = nodeEngine;
    }

    @Override
    public void shutdown(boolean terminate) {
        System.out.println("CounterService.shutdown()");
    }

    @Override
    public void reset() {
    }

    public NodeEngine getNodeEngine() {
        return nodeEngine;
    }
}

Enabling the Service Class

Now, we need to enable the class CounterService. The declarative way of doing this is shown below.

<hazelcast>
    ...
    <network>
        <join><multicast enabled="true"/> </join>
    </network>
    <services>
        <service enabled="true">
            <name>CounterService</name>
            <class-name>CounterService</class-name>
       </service>
    </services>
    ...
</hazelcast>

The CounterService is declared within the services configuration element.

  • Set the enabled attribute to true to enable the service.

  • Set the name attribute to the name of the service. It should be a unique name (CounterService in our case) since it is looked up when a remote call is made. Note that the value of this attribute is sent at each request and a longer name value means more data (de)serialization. A good practice is to give an understandable name with the shortest possible length.

  • Set the class-name attribute to the class name of the service (CounterService in our case). The class should have a no-arg constructor. Otherwise, the object cannot be initialized.

Note that multicast is enabled as the join mechanism. In the later sections for the CounterService example, we will see why.

Adding Properties to the Service

The init method for CounterService takes the Properties object as an argument. This means we can add properties to the service that are passed to the init method; see Creating the Service Class. You can add properties declaratively as shown below. (You likely want to name your properties something other than someproperty.)

<hazelcast>
    ...
    <service enabled="true">
        <name>CounterService</name>
        <class-name>CounterService</class-name>
        <properties>
            <someproperty>10</someproperty>
        </properties>
    </service>
    ...
</hazelcast>

If you want to parse a more complex XML, you can use the interface com.hazelcast.spi.ServiceConfigurationParser. It gives you access to the XML DOM tree.

Starting the Service

Now, let’s start a HazelcastInstance as shown below, which starts the CounterService.

public class Member {

    public static void main(String[] args) {
        Hazelcast.newHazelcastInstance();

        Hazelcast.shutdownAll();
    }
}

Once it starts, the CounterService init method prints the following output.

CounterService.init

Once the HazelcastInstance is shutdown (for example, with Ctrl+C), the CounterService shutdown method prints the following output.

CounterService.shutdown

Placing a Remote Call via Proxy

In the previous sections for the CounterService example, we started CounterService as part of a HazelcastInstance startup.

Now, let’s connect the Counter interface to CounterService and perform a remote call to the cluster member hosting the counter data. Then, we return a dummy result.

Remote calls are performed via a proxy in Hazelcast. Proxies expose the methods on the client side. Once a method is called, proxy creates an operation object, sends this object to the cluster member responsible from executing that operation and then sends the result.

Making Counter a Distributed Object

First, we need to make the Counter interface a distributed object by extending the DistributedObject interface, as shown below.

public interface Counter extends DistributedObject {
    int inc(int amount);
}

Implementing ManagedService and RemoteService

Now, we need to make the CounterService class implement not only the ManagedService interface, but also the interface com.hazelcast.spi.RemoteService. This way, a client can get a handle of a counter proxy. You can check the source code for RemoteService the here.

public class CounterService implements ManagedService, RemoteService {

    static final String NAME = "CounterService";

    Container[] containers;
    private NodeEngine nodeEngine;

    @Override
    public DistributedObject createDistributedObject(String objectName) {
        return new CounterProxy(objectName, nodeEngine, this);
    }

    @Override
    public void destroyDistributedObject(String objectName) {
        // for the time being a no-op
    }

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        this.nodeEngine = nodeEngine;
    }

    @Override
    public void shutdown(boolean terminate) {
    }

    @Override
    public void reset() {
    }
}

The CounterProxy returned by the method createDistributedObject is a local representation to (potentially) remote managed data and logic.

Note that caching and removing the proxy instance are done outside of this service.

Implementing CounterProxy

Now, it is time to implement the CounterProxy as shown below. CounterProxy extends AbstractDistributedObject, source code is here.

public class CounterProxy extends AbstractDistributedObject<CounterService> implements Counter {

    private final String name;

    CounterProxy(String name, NodeEngine nodeEngine, CounterService counterService) {
        super(nodeEngine, counterService);
        this.name = name;
    }

    @Override
    public String getServiceName() {
        return CounterService.NAME;
    }

    @Override
    public String getName() {
        return name;
    }

    @Override
    public int inc(int amount) {
        NodeEngine nodeEngine = getNodeEngine();
        IncOperation operation = new IncOperation(name, amount);
        int partitionId = nodeEngine.getPartitionService().getPartitionId(name);
        InvocationBuilder builder = nodeEngine.getOperationService()
                .createInvocationBuilder(CounterService.NAME, operation, partitionId);
        try {
            Future<Integer> future = builder.invoke();
            return future.get();
        } catch (Exception e) {
            throw ExceptionUtil.rethrow(e);
        }
    }
}

CounterProxy is a local representation of remote data/functionality. It does not include the counter state. Therefore, the method inc should be invoked on the cluster member hosting the real counter. You can invoke it using Hazelcast SPI; then it sends the operations to the correct member and return the results.

Let’s dig deeper into the method inc.

  • First, we create IncOperation with a given name and amount.

  • Then, we get the partition ID based on the name; by this way, all operations for a given name result in the same partition ID.

  • Then, we create an InvocationBuilder where the connection between operation and partition is made.

  • Finally, we invoke the InvocationBuilder and wait for its result. This waiting is performed with a future.get(). In our case, timeout is not important. However, it is a good practice to use a timeout for a real system since operations should complete in a certain amount of time.

Dealing with Exceptions

Hazelcast’s ExceptionUtil is a good solution when it comes to dealing with execution exceptions. When the execution of the operation fails with an exception, an ExecutionException is thrown and handled with the method ExceptionUtil.rethrow(Throwable).

If it is an InterruptedException, we have two options: either propagate the exception or just use the ExceptionUtil.rethrow for all exceptions. See the example code below.

  try {
     final Future<Integer> future = invocation.invoke();
     return future.get();
  } catch(InterruptedException e){
     throw e;
  } catch(Exception e){
     throw ExceptionUtil.rethrow(e);
  }

Implementing the PartitionAwareOperation Interface

Now, let’s write the IncOperation. It implements the PartitionAwareOperation interface, meaning that it will be executed on the partition that hosts the counter. See the PartitionAwareOperation source code here.

The method run does the actual execution. Since IncOperation returns a response, the method returnsResponse returns true. If your method is asynchronous and does not need to return a response, it is better to return false since it will be faster. The actual response is stored in the field returnValue; retrieve it with the method getResponse.

There are two more methods in this code: writeInternal and readInternal. Since IncOperation needs to be serialized, these two methods are overridden, and hence, objectId and amount are serialized and available when those operations are executed.

For the deserialization, note that the operation must have a no-arg constructor.

class IncOperation extends Operation implements PartitionAwareOperation {

    private String objectId;
    private int amount;
    private int returnValue;


    public IncOperation() {
    }

    IncOperation(String objectId, int amount) {
        this.amount = amount;
        this.objectId = objectId;
    }

    @Override
    public void run() throws Exception {
        System.out.println("Executing " + objectId + ".inc() on: " + getNodeEngine().getThisAddress());
        returnValue = 0;
    }

    @Override
    public Object getResponse() {
        return returnValue;
    }

    @Override
    protected void writeInternal(ObjectDataOutput out) throws IOException {
        super.writeInternal(out);
        out.writeUTF(objectId);
        out.writeInt(amount);
    }

    @Override
    protected void readInternal(ObjectDataInput in) throws IOException {
        super.readInternal(in);
        objectId = in.readUTF();
        amount = in.readInt();
    }
}

Running the Code

Now, let’s run our code.

        HazelcastInstance[] instances = new HazelcastInstance[2];
        for (int i = 0; i < instances.length; i++) {
            instances[i] = Hazelcast.newHazelcastInstance();
        }

        Counter[] counters = new Counter[4];
        for (int i = 0; i < counters.length; i++) {
            counters[i] = instances[0].getDistributedObject(CounterService.NAME, i + "counter");
        }

        for (Counter counter : counters) {
            System.out.println(counter.inc(1));
        }

        System.out.println("Finished");
        Hazelcast.shutdownAll();

The output is similar to the following:

Executing 0counter.inc() on: Address[192.168.1.103]:5702

0

Executing 1counter.inc() on: Address[192.168.1.103]:5702

0

Executing 2counter.inc() on: Address[192.168.1.103]:5701

0

Executing 3counter.inc() on: Address[192.168.1.103]:5701

0

Finished

Note that counters are stored in different cluster members. Also note that increment is not active for now since the value remains as 0.

Until now, we have performed the basics to get this up and running. In the next section, we will make a real counter, cache the proxy instances and deal with proxy instance destruction.

Creating Containers

Let’s create a Container for every partition in the system. This container contains all counters and proxies.

class Container {
    private final Map<String, Integer> values = new HashMap();

    int inc(String id, int amount) {
        Integer counter = values.get(id);
        if (counter == null) {
            counter = 0;
        }
        counter += amount;
        values.put(id, counter);
        return counter;
    }

    public void init(String objectName) {
        values.put(objectName,0);
    }

    public void destroy(String objectName) {
        values.remove(objectName);
    }
}

Hazelcast guarantees that a single thread is active in a single partition. Therefore, when accessing a container, concurrency control is not an issue.

The code in our example uses a Container instance per partition approach. With this approach, there will not be any mutable shared state between partitions. This approach also makes operations on partitions simpler since you do not need to filter out data that does not belong to a certain partition.

The code performs the following tasks:

  • Creates a container for every partition with the method init.

  • Creates the proxy with the method createDistributedObject.

  • Removes the value of the object with the method destroyDistributedObject, otherwise we may get an OutOfMemory exception.

Integrating the Container in the CounterService

Let’s integrate the Container in the CounterService, as shown below.

public class CounterService implements ManagedService, RemoteService {
    public final static String NAME = "CounterService";
    Container[] containers;
    private NodeEngine nodeEngine;

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        this.nodeEngine = nodeEngine;
        containers = new Container[nodeEngine.getPartitionService().getPartitionCount()];
        for (int k = 0; k < containers.length; k++)
            containers[k] = new Container();
    }

    @Override
    public void shutdown(boolean terminate) {
    }

    @Override
    public CounterProxy createDistributedObject(String objectName) {
        int partitionId = nodeEngine.getPartitionService().getPartitionId(objectName);
        Container container = containers[partitionId];
        container.init(objectName);
        return new CounterProxy(objectName, nodeEngine, this);
    }

    @Override
    public void destroyDistributedObject(String objectName) {
        int partitionId = nodeEngine.getPartitionService().getPartitionId(objectName);
        Container container = containers[partitionId];
        container.destroy(objectName);
    }

    @Override
    public void reset() {
    }

    public static class Container {
        final Map<String, Integer> values = new HashMap<String, Integer>();

        private void init(String objectName) {
            values.put(objectName, 0);
        }

        private void destroy(String objectName){
            values.remove(objectName);
        }
    }
}

Connecting the IncOperation.run Method to the Container

As the last step in creating a Container, we connect the method IncOperation.run to the Container, as shown below.

partitionId has a range between 0 and partitionCount and can be used as an index for the container array. Therefore, you can use partitionId to retrieve the container and once the container has been retrieved, you can access the value.

class IncOperation extends Operation implements PartitionAwareOperation {
    private String objectId;
    private int amount, returnValue;

    public IncOperation() {
    }

    public IncOperation(String objectId, int amount) {
        this.amount = amount;
        this.objectId = objectId;
    }

    @Override
    public void run() throws Exception {
        System.out.println("Executing " + objectId + ".inc() on: " + getNodeEngine().getThisAddress());
        CounterService service = getService();
        CounterService.Container container = service.containers[getPartitionId()];
        Map<String, Integer> valuesMap = container.values;

        Integer counter = valuesMap.get(objectId);
        counter += amount;
        valuesMap.put(objectId, counter);
        returnValue = counter;
    }

    @Override
    public Object getResponse() {
        return returnValue;
    }

    @Override
    protected void writeInternal(ObjectDataOutput out) throws IOException {
        super.writeInternal(out);
        out.writeUTF(objectId);
        out.writeInt(amount);
    }

    @Override
    protected void readInternal(ObjectDataInput in) throws IOException {
        super.readInternal(in);
        objectId = in.readUTF();
        amount = in.readInt();
    }
}

Running the Example Code

Let’s run the following example code.

public class Member {
    public static void main(String[] args) {
        HazelcastInstance[] instances = new HazelcastInstance[2];
        for (int k = 0; k < instances.length; k++)
            instances[k] = Hazelcast.newHazelcastInstance();

        Counter[] counters = new Counter[4];
        for (int k = 0; k < counters.length; k++)
            counters[k] = instances[0].getDistributedObject(CounterService.NAME, k+"counter");

        System.out.println("Round 1");
        for (Counter counter: counters)
            System.out.println(counter.inc(1));

        System.out.println("Round 2");
        for (Counter counter: counters)
            System.out.println(counter.inc(1));

        System.out.println("Finished");
        System.exit(0);
    }
}

The output is as follows. It indicates that we have now a basic distributed counter up and running.

Round 1
Executing 0counter.inc() on: Address[192.168.1.103]:5702
1
Executing 1counter.inc() on: Address[192.168.1.103]:5702
1
Executing 2counter.inc() on: Address[192.168.1.103]:5701
1
Executing 3counter.inc() on: Address[192.168.1.103]:5701
1
Round 2
Executing 0counter.inc() on: Address[192.168.1.103]:5702
2
Executing 1counter.inc() on: Address[192.168.1.103]:5702
2
Executing 2counter.inc() on: Address[192.168.1.103]:5701
2
Executing 3counter.inc() on: Address[192.168.1.103]:5701
2
Finished

Partition Migration

In the previous section, we created a real distributed counter. Now, we need to make sure that the content of the partition containers is migrated to different cluster members when a member joins or leaves the cluster. To make this happen, first we need to add three new methods (applyMigrationData, toMigrationData and clear) to the Container as explained below:

  • toMigrationData: This method is called when Hazelcast wants to start the partition migration from the member owning the partition. The result of the toMigrationData method is the partition data in a form that can be serialized to another member.

  • applyMigrationData: This method is called when migrationData (created by the toMigrationData object) is applied to the member that will be the new partition owner.

  • clear: This method is called when the partition migration is successfully completed and the old partition owner gets rid of all data in the partition. This method is also called when the partition migration operation fails and the to-be-the-new partition owner needs to roll back its changes.

class Container {
    private final Map<String, Integer> values = new HashMap();

    int inc(String id, int amount) {
        Integer counter = values.get(id);
        if (counter == null) {
            counter = 0;
        }
        counter += amount;
        values.put(id, counter);
        return counter;
    }

    void clear() {
        values.clear();
    }

    void applyMigrationData(Map<String, Integer> migrationData) {
        values.putAll(migrationData);
    }

    Map<String, Integer> toMigrationData() {
        return new HashMap(values);
    }

    public void init(String objectName) {
        values.put(objectName,0);
    }

    public void destroy(String objectName) {
        values.remove(objectName);
    }
}

Transferring migrationData

After you add these three methods to the Container, you need to create a CounterMigrationOperation class that transfers migrationData from one member to another and calls the method applyMigrationData on the correct partition of the new partition owner.

An example is shown below.

public class CounterMigrationOperation extends Operation {

    Map<String, Integer> migrationData;

    public CounterMigrationOperation() {
    }

    public CounterMigrationOperation(Map<String, Integer> migrationData) {
        this.migrationData = migrationData;
    }

    @Override
    public void run() throws Exception {
        CounterService service = getService();
        Container container = service.containers[getPartitionId()];
        container.applyMigrationData(migrationData);
    }

    @Override
    protected void writeInternal(ObjectDataOutput out) throws IOException {
        out.writeInt(migrationData.size());
        for (Map.Entry<String, Integer> entry : migrationData.entrySet()) {
            out.writeUTF(entry.getKey());
            out.writeInt(entry.getValue());
        }
    }

    @Override
    protected void readInternal(ObjectDataInput in) throws IOException {
        int size = in.readInt();
        migrationData = new HashMap<String, Integer>();
        for (int i = 0; i < size; i++)
            migrationData.put(in.readUTF(), in.readInt());
    }
}
During a partition migration, no other operations are executed on the related partition.

Letting Hazelcast Know CounterService Can Do Partition Migrations

We need to make our CounterService class implement the MigrationAwareService interface, so that Hazelcast knows that the CounterService can perform partition migrations.

With the MigrationAwareService interface, some additional methods are exposed. For example, the method prepareMigrationOperation returns all the data of the partition that is going to be moved. You can check the MigrationAwareService source code here.

The method commitMigration commits the data, meaning that in this case, it clears the partition container of the old owner.

public class CounterService implements ManagedService, RemoteService, MigrationAwareService {
    public final static String NAME = "CounterService";
    Container[] containers;
    private NodeEngine nodeEngine;

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        this.nodeEngine = nodeEngine;
        containers = new Container[nodeEngine.getPartitionService().getPartitionCount()];
        for (int k = 0; k < containers.length; k++)
            containers[k] = new Container();
    }

    @Override
    public void shutdown(boolean terminate) {
    }

    @Override
    public DistributedObject createDistributedObject(String objectName) {
        int partitionId = nodeEngine.getPartitionService().getPartitionId(objectName);
        Container container = containers[partitionId];
        container.init(objectName);
        return new CounterProxy(objectName, nodeEngine,this);
    }

    @Override
    public void destroyDistributedObject(String objectName) {
        int partitionId = nodeEngine.getPartitionService().getPartitionId(objectName);
        Container container = containers[partitionId];
        container.destroy(objectName);
    }

    @Override
    public void beforeMigration(PartitionMigrationEvent e) {
        //no-op
    }

    @Override
    public void clearPartitionReplica(int partitionId) {
        Container container = containers[partitionId];
        container.clear();
    }

    @Override
    public Operation prepareReplicationOperation(PartitionReplicationEvent e) {
        if (e.getReplicaIndex() > 1) {
            return null;
        }
        Container container = containers[e.getPartitionId()];
        Map<String, Integer> data = container.toMigrationData();
        return data.isEmpty() ? null : new CounterMigrationOperation(data);
    }

    @Override
    public void commitMigration(PartitionMigrationEvent e) {
        if (e.getMigrationEndpoint() == MigrationEndpoint.SOURCE) {
            Container c = containers[e.getPartitionId()];
            c.clear();
        }

        //todo
    }

    @Override
    public void rollbackMigration(PartitionMigrationEvent e) {
        if (e.getMigrationEndpoint() == MigrationEndpoint.DESTINATION) {
            Container c = containers[e.getPartitionId()];
            c.clear();
        }
    }

    @Override
    public void reset() {
    }
}

Running the Example Code

We can run the following code.

public class Member {
    public static void main(String[] args) throws Exception {
        HazelcastInstance[] instances = new HazelcastInstance[3];
        for (int k = 0; k < instances.length; k++)
            instances[k] = Hazelcast.newHazelcastInstance();

        Counter[] counters = new Counter[4];
        for (int k = 0; k < counters.length; k++)
            counters[k] = instances[0].getDistributedObject(CounterService.NAME, k + "counter");

        for (Counter counter : counters)
            System.out.println(counter.inc(1));

        Thread.sleep(10000);

        System.out.println("Creating new members");

        for (int k = 0; k < 3; k++) {
            Hazelcast.newHazelcastInstance();
        }

        Thread.sleep(10000);

        for (Counter counter : counters)
            System.out.println(counter.inc(1));

        System.out.println("Finished");
        System.exit(0);
    }
}

And we get the following output.

Executing 0counter.inc() on: Address[192.168.1.103]:5702
Executing backup 0counter.inc() on: Address[192.168.1.103]:5703
1
Executing 1counter.inc() on: Address[192.168.1.103]:5703
Executing backup 1counter.inc() on: Address[192.168.1.103]:5701
1
Executing 2counter.inc() on: Address[192.168.1.103]:5701
Executing backup 2counter.inc() on: Address[192.168.1.103]:5703
1
Executing 3counter.inc() on: Address[192.168.1.103]:5701
Executing backup 3counter.inc() on: Address[192.168.1.103]:5703
1
Creating new members
Executing 0counter.inc() on: Address[192.168.1.103]:5705
Executing backup 0counter.inc() on: Address[192.168.1.103]:5703
2
Executing 1counter.inc() on: Address[192.168.1.103]:5703
Executing backup 1counter.inc() on: Address[192.168.1.103]:5704
2
Executing 2counter.inc() on: Address[192.168.1.103]:5705
Executing backup 2counter.inc() on: Address[192.168.1.103]:5704
2
Executing 3counter.inc() on: Address[192.168.1.103]:5704
Executing backup 3counter.inc() on: Address[192.168.1.103]:5705
2
Finished

You can see that the counters have moved. 0counter moved from 192.168.1.103:5702 to 192.168.1.103:5705 and it is incremented correctly. Our counters can now move around in the cluster: they will be redistributed once you add or remove a cluster member.

Creating Backups

Finally, we make sure that the counter data is available on another member when a member goes down. To do this, have the IncOperation class implement the BackupAwareOperation interface contained in the SPI package. See the following code.

class IncOperation extends Operation
	implements PartitionAwareOperation, BackupAwareOperation {
   ...

   @Override
   public int getAsyncBackupCount() {
      return 0;
   }

   @Override
   public int getSyncBackupCount() {
      return 1;
   }

   @Override
   public boolean shouldBackup() {
      return true;
   }

   @Override
   public Operation getBackupOperation() {
      return new IncBackupOperation(objectId, amount);
   }
}

The methods getAsyncBackupCount and getSyncBackupCount specify the count for asynchronous and synchronous backups. Our example has one synchronous backup and no asynchronous backups. In the above code, counts of the backups are hard-coded, but they can also be passed to IncOperation as parameters.

The method shouldBackup specifies whether our Operation needs a backup or not. For our example, it returns true, meaning the Operation always has a backup even if there are no changes. Of course, in real systems, we want to have backups if there is a change. For IncOperation for example, having a backup when amount is null would be a good practice.

The method getBackupOperation returns the operation (IncBackupOperation) that actually performs the backup creation; the backup itself is an operation and runs on the same infrastructure.

If a backup should be made and getSyncBackupCount returns 3, then three IncBackupOperation instances are created and sent to the three machines containing the backup partition. If fewer machines are available, then backups need to be created. Hazelcast will just send a smaller number of operations.

Performing the Backup with IncBackupOperation

Now, let’s have a look at the IncBackupOperation. It implements BackupOperation, you can see the source code for BackupOperation here.

public class IncBackupOperation
	extends Operation implements BackupOperation {
   private String objectId;
   private int amount;

   public IncBackupOperation() {
   }

   public IncBackupOperation(String objectId, int amount) {
      this.amount = amount;
      this.objectId = objectId;
   }

   @Override
   protected void writeInternal(ObjectDataOutput out) throws IOException {
      super.writeInternal(out);
      out.writeUTF(objectId);
      out.writeInt(amount);
   }

   @Override
   protected void readInternal(ObjectDataInput in) throws IOException {
      super.readInternal(in);
      objectId = in.readUTF();
      amount = in.readInt();
   }

   @Override
   public void run() throws Exception {
      CounterService service = getService();
      System.out.println("Executing backup " + objectId + ".inc() on: "
        + getNodeEngine().getThisAddress());
      Container c = service.containers[getPartitionId()];
      c.inc(objectId, amount);
   }
}
Hazelcast also makes sure that a new IncOperation for that particular key will not be executed before the (synchronous) backup operation has completed.

Running the Example Code

Let’s see the backup functionality in action with the following code.

public class Member {
   public static void main(String[] args) throws Exception {
      HazelcastInstance[] instances = new HazelcastInstance[2];
      for (int k = 0; k < instances.length; k++)
         instances[k] = Hazelcast.newHazelcastInstance();

      Counter counter = instances[0].getDistributedObject(CounterService.NAME, "counter");
      counter.inc(1);
      System.out.println("Finished");
      System.exit(0);
    }
}

The output is as follows:

Executing counter0.inc() on: Address[192.168.1.103]:5702
Executing backup counter0.inc() on: Address[192.168.1.103]:5701
Finished

As it can be seen, both IncOperation and IncBackupOperation are executed. Notice that these operations have been executed on different cluster members to guarantee high availability.