This is a prerelease version.

View latest

Ringbuffer

Hazelcast Ringbuffer is a replicated but not partitioned data structure that stores its data in a ring-like structure. You can think of it as a circular array with a given capacity. Each Ringbuffer has a tail and a head. The tail is where the items are added and the head is where the items are overwritten or expired. You can reach each element in a Ringbuffer using a sequence ID, which is mapped to the elements between the head and tail (inclusive) of the Ringbuffer.

Getting a Ringbuffer and Reading Items

Reading from Ringbuffer is simple: get the Ringbuffer with the HazelcastInstance getRingbuffer method, get its current head with the headSequence method and start reading. Use the method readOne to return the item at the given sequence; readOne blocks if no item is available. To read the next item, increment the sequence by one.

        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        Ringbuffer<String> ringbuffer = hz.getRingbuffer("rb");
        long sequence = ringbuffer.headSequence();
        while(true){
            String item = ringbuffer.readOne(sequence);
            sequence++;
            // process item
        }

By exposing the sequence, you can now move the item from the Ringbuffer as long as the item is still available. If the item is not available any longer, StaleSequenceException is thrown.

Adding Items to a Ringbuffer

Adding an item to a Ringbuffer is also easy with the Ringbuffer add method:

Ringbuffer<String> ringbuffer = hz.getRingbuffer("ExampleRB");
ringbuffer.add("someitem");

Use the method add to return the sequence of the inserted item; the sequence value is always unique. You can use this as a very cheap way of generating unique IDs if you are already using Ringbuffer.

IQueue vs. Ringbuffer

Hazelcast Ringbuffer can sometimes be a better alternative than a Hazelcast IQueue. Unlike IQueue, Ringbuffer does not remove the items, it only reads items using a certain position. There are many advantages to this approach as described below:

  • The same item can be read multiple times by the same thread. This is useful for realizing semantics of read-at-least-once or read-at-most-once.

  • The same item can be read by multiple threads. Normally you could use an IQueue per thread for the same semantic, but this is less efficient because of the increased remoting. A take from an IQueue is destructive, so the change needs to be applied for backup also, which is why a queue.take() is more expensive than a ringBuffer.read(…​).

  • Reads are extremely cheap since there is no change in the Ringbuffer. Therefore no replication is required.

  • Reads and writes can be batched to speed up performance. Batching can dramatically improve the performance of Ringbuffer.

Configuring Ringbuffer Capacity

By default, a Ringbuffer is configured with a capacity of 10000 items. This creates an array with a size of 10000. If a time-to-live is configured, then an array of longs is also created that stores the expiration time for every item. In a lot of cases you may want to change this capacity number to something that better fits your needs.

Below is a declarative configuration example of a Ringbuffer with a capacity of 2000 items.

  • XML

  • YAML

<hazelcast>
    ...
    <ringbuffer name="rb">
        <capacity>2000</capacity>
    </ringbuffer>
    ...
</hazelcast>
hazelcast:
  ringbuffer:
    rb:
      capacity: 2000

Currently, Hazelcast Ringbuffer is not a partitioned data structure; its data is stored in a single partition and the replicas are stored in another partition. Therefore, create a Ringbuffer that can safely fit in a single cluster member.

Backing Up Ringbuffer

Hazelcast Ringbuffer has a single synchronous backup by default. You can control the Ringbuffer backup just like most of the other Hazelcast distributed data structures by setting the synchronous and asynchronous backups: backup-count and async-backup-count. In the example below, a Ringbuffer is configured with no synchronous backups and one asynchronous backup:

  • XML

  • YAML

<hazelcast>
    ...
    <ringbuffer name="rb">
        <backup-count>0</backup-count>
        <async-backup-count>1</async-backup-count>
    </ringbuffer>
    ...
</hazelcast>
hazelcast:
  ringbuffer:
    rb:
      backup-count: 0
      async-backup-count: 1

An asynchronous backup probably gives you better performance. However, there is a chance that the item added will be lost when the member owning the primary crashes before the backup could complete. You may want to consider batching methods if you need high performance but do not want to give up on consistency.

Configuring Ringbuffer Time-To-Live

You can configure Hazelcast Ringbuffer with a time-to-live in seconds. Using this setting, you can control how long the items remain in the Ringbuffer before they are expired. By default, the time-to-live is set to 0, meaning that unless the item is overwritten, it will remain in the Ringbuffer indefinitely. If you set a time-to-live and an item is added, then, depending on the Overflow Policy, either the oldest item is overwritten, or the call is rejected.

In the example below, a Ringbuffer is configured with a time-to-live of 180 seconds.

  • XML

  • YAML

<hazelcast>
    ...
    <ringbuffer name="rb">
        <time-to-live-seconds>180</time-to-live-seconds>
    </ringbuffer>
    ...
</hazelcast>
hazelcast:
  ringbuffer:
    rb:
      time-to-live-seconds: 180

Setting Ringbuffer Overflow Policy

Using the overflow policy, you can determine what to do if the oldest item in the Ringbuffer is not old enough to expire when more items than the configured Ringbuffer capacity are being added. The below options are currently available:

  • OverflowPolicy.OVERWRITE: The oldest item is overwritten.

  • OverflowPolicy.FAIL: The call is aborted. The methods that make use of the OverflowPolicy return -1 to indicate that adding the item has failed.

Overflow policy gives you fine control on what to do if the Ringbuffer is full. You can also use the overflow policy to apply a back pressure mechanism. The following example code shows the usage of an exponential backoff.

        Random random = new Random();
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        Ringbuffer<Long> rb = hz.getRingbuffer("rb");

        long i = 100;
        while (true) {
            long sleepMs = 100;
            for (; ; ) {
                long result = rb.addAsync(i, OverflowPolicy.FAIL).toCompletableFuture().get();
                if (result != -1) {
                    break;
                }
                TimeUnit.MILLISECONDS.sleep(sleepMs);
                sleepMs = min(5000, sleepMs * 2);
            }

            // add a bit of random delay to make it look a bit more realistic
            Thread.sleep(random.nextInt(10));

            System.out.println("Written: " + i);
            i++;
        }

Ringbuffer with Persistent Datastore

Hazelcast allows you to load and store the Ringbuffer items from/to a persistent datastore using the interface RingbufferStore. If a Ringbuffer store is enabled, each item added to the Ringbuffer will also be stored at the configured Ringbuffer store.

If the Ringbuffer store is configured, you can get items with sequences which are no longer in the actual Ringbuffer but are only in the Ringbuffer store. This is probably much slower but still allows you to continue consuming items from the Ringbuffer even if they are overwritten with newer items in the Ringbuffer.

When a Ringbuffer is being instantiated, it checks if the Ringbuffer store is configured and requests the latest sequence in the Ringbuffer store. This is to enable the Ringbuffer to start with sequences larger than the ones in the Ringbuffer store. In this case, the Ringbuffer is empty but you can still request older items from it (which will be loaded from the Ringbuffer store).

The Ringbuffer store stores items in the same format as the Ringbuffer. If the BINARY in-memory format is used, the Ringbuffer store must implement the interface RingbufferStore<byte[]> meaning that the Ringbuffer receives items in the binary format. If the OBJECT in-memory format is used, the Ringbuffer store must implement the interface RingbufferStore<K>, where K is the type of item being stored (meaning that the Ringbuffer store receives the deserialized object).

When adding items to the Ringbuffer, the method storeAll allows you to store items in batches.

The following example class includes all the RingbufferStore methods.

public class TheRingbufferObjectStore implements RingbufferStore<Item> {

    @Override
    public void store(long sequence, Item data) {
        System.out.println("Object store");
    }

    @Override
    public void storeAll(long firstItemSequence, Item[] items) {
        System.out.println("Object store all");
    }

    @Override
    public Item load(long sequence) {
        System.out.println("Object load");
        return null;
    }

    @Override
    public long getLargestSequence() {
        System.out.println("Object get largest sequence");
        return -1;
    }
}

Item must be serializable. The following is an example of a Ringbuffer with the Ringbuffer store configured and enabled.

  • XML

  • YAML

<hazelcast>
    ...
    <ringbuffer name="default">
        <capacity>10000</capacity>
        <time-to-live-seconds>30</time-to-live-seconds>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <in-memory-format>BINARY</in-memory-format>
        <ringbuffer-store>
            <class-name>com.hazelcast.RingbufferStoreImpl</class-name>
        </ringbuffer-store>
    </ringbuffer>
    ...
</hazelcast>
hazelcast:
  ringbuffer:
    default:
      capacity: 10000
      time-to-live-seconds: 30
      backup-count: 1
      async-backup-count: 0
      in-memory-format: BINARY
      ringbuffer-store:
        class-name: com.hazelcast.RingbufferStoreImpl

The following are the configuration elements and their descriptions:

  • class-name: Name of the Ringbuffer store factory class.

Configuring Ringbuffer In-Memory Format

You can configure Hazelcast Ringbuffer with an in-memory format that controls the format of the Ringbuffer’s stored items. By default, BINARY in-memory format is used, meaning that the object is stored in a serialized form. You can select the OBJECT in-memory format, which is useful when filtering is applied or when the OBJECT in-memory format has a smaller memory footprint than BINARY.

In the declarative configuration example below, a Ringbuffer is configured with the OBJECT in-memory format:

  • XML

  • YAML

<hazelcast>
    ...
    <ringbuffer name="rb">
        <in-memory-format>OBJECT</in-memory-format>
    </ringbuffer>
    ...
</hazelcast>
hazelcast:
  ringbuffer:
    rb:
      in-memory-format: OBJECT

Configuring Split-Brain Protection for Ringbuffer

Ringbuffer can be configured to check for a minimum number of available members before applying Ringbuffer operations. This is a check to avoid performing successful Ringbuffer operations on all parts of a cluster during a network partition and can be configured using the element split-brain-protection-ref. You should set this element’s value as the quorum’s name, which you configured under the split-brain-protection element as explained in the Split-Brain Protection section. Following is an example snippet:

  • XML

  • YAML

<hazelcast>
    ...
    <ringbuffer name="rb">
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </ringbuffer>
    ...
</hazelcast>
hazelcast:
  ringbuffer:
    rb:
      split-brain-protection-ref: splitbrainprotection-name

The following is a list of methods, grouped by the protection types, that support split-brain protection checks:

  • WRITE, READ_WRITE:

    • add

    • addAllAsync

    • addAsync

  • READ, READ_WRITE:

    • capacity

    • headSequence

    • readManyAsync

    • readOne

    • remainingCapacity

    • size

    • tailSequence

Adding Batched Items

In the previous examples, the method ringBuffer.add() is used to add an item to the Ringbuffer. The problems with this method are that it always overwrites and that it does not support batching. Batching can have a huge impact on the performance. You can use the method addAllAsync to support batching.

See the following example code.

List<String> items = Arrays.asList("1","2","3");
CompletionStage<Long> s = rb.addAllAsync(items, OverflowPolicy.OVERWRITE);
// block until all items are added
s.toCompletableFuture().join();

In the above case, three strings are added to the Ringbuffer using the policy OverflowPolicy.OVERWRITE. See the Overflow Policy section for more information.

Reading Batched Items

In the previous example, the readOne method read items from the Ringbuffer. readOne is simple but not very efficient for the following reasons:

  • readOne does not use batching.

  • readOne cannot filter items at the source; the items need to be retrieved before being filtered.

The method readManyAsync can read a batch of items and can filter items at the source.

See the following example code.

CompletionStage<ReadResultSet<E>> readManyAsync(
    long startSequence,
    int minCount,
    int maxCount,
    IFunction<E, Boolean> filter);

The meanings of the readManyAsync arguments are given below:

  • startSequence: Sequence of the first item to read.

  • minCount: Minimum number of items to read. If you do not want to block, set it to 0. If you want to block for at least one item, set it to 1.

  • maxCount: Maximum number of the items to retrieve. Its value cannot exceed 1000.

  • filter: A function that accepts an item and checks if it should be returned. If no filtering should be applied, set it to null.

A full example is given below.

long sequence = rb.headSequence();
for(;;) {
    CompletionStage<ReadResultSet<String>> f = rb.readManyAsync(sequence, 1, 10, null);
    CompletionStage<Integer> readCountStage = f.thenApplyAsync(rs -> {
        for (String s : rs) {
            System.out.println(s);
        }
        return rs.readCount();
    });
    sequence += readCountStage.toCompletableFuture().join();
}

Please take a careful look at how your sequence is being incremented. You cannot always rely on the number of items being returned if the items are filtered out.

There is not any filtering applied in the above example. The following example shows how you can apply a filter when reading batched items. First, let’s create our filter as shown below:

public class FruitFilter implements IFunction<String, Boolean> {
    public FruitFilter() {}

    public Boolean apply(String s) {
        return s.startsWith("a");
    }
}

So, the FruitFilter checks whether a String object starts with the letter "a". You can see this filter in action in the below example:

HazelcastInstance hz = Hazelcast.newHazelcastInstance();
Ringbuffer<String> rb = hz.getRingbuffer("rb");

rb.add("apple");
rb.add("orange");
rb.add("pear");
rb.add("peach");
rb.add("avocado");

long sequence = rb.headSequence();
CompletableFuture<ReadResultSet<String>> f = rb.readManyAsync(sequence, 2, 5, new FruitFilter()).toCompletableFuture();

ReadResultSet<String> rs = f.join();
for (String s : rs) {
    System.out.println(s);
}

Using Async Methods

Hazelcast Ringbuffer provides asynchronous methods for more powerful operations like batched writing or batched reading with filtering. To wait for the result of the operation in a blocking way, obtain a CompletableFuture from the returned CompletionStage by invoking CompletionStage#toCompletableFuture() method, then use either CompletableFuture#get() or CompletableFuture#join().

See the following example code.

CompletionStage<Long> f = ringbuffer.addAsync(item, OverflowPolicy.FAIL);
f.toCompletableFuture().get();

However, you can also use CompletionStage API to add subsequent dependent computation stages which will be executed when the operation has completed. This way the thread used for the call is not blocked until the response is returned.

See the below code as an example of when you want to get notified when a batch of reads has completed.

CompletionStage<ReadResultSet<String>> stage = rb.readManyAsync(sequence, min, max, someFilter);
stage.whenCompleteAsync((response, throwable) -> {
    if (throwable == null) {
         for (String s : response) {
             System.out.println("Received:" + s);
         }
    } else {
        throwable.printStackTrace();
    }
});

Ringbuffer Configuration Examples

The following shows the declarative configuration of a Ringbuffer called rb. The configuration is modeled after the Ringbuffer defaults.

  • XML

  • YAML

<hazelcast>
    ...
    <ringbuffer name="rb">
        <capacity>10000</capacity>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <time-to-live-seconds>0</time-to-live-seconds>
        <in-memory-format>BINARY</in-memory-format>
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </ringbuffer>
    ...
</hazelcast>
hazelcast:
  ringbuffer:
    rb:
      capacity: 10000
      backup-count: 1
      async-backup-count: 0
      time-to-live-seconds: 0
      in-memory-format: BINARY
      split-brain-protection-ref: splitbrainprotection-name

You can also configure a Ringbuffer programmatically. The following is a programmatic version of the above declarative configuration.

        Config config = new Config();
        RingbufferConfig rbConfig = config.getRingbufferConfig("myRB");
        rbConfig.setCapacity(10000)
                .setBackupCount(1)
                .setAsyncBackupCount(0)
                .setTimeToLiveSeconds(0)
                .setInMemoryFormat(InMemoryFormat.BINARY)
                .setSplitBrainProtectionName("splitbrainprotectionname");