Ringbuffer
Hazelcast Ringbuffer is a replicated but not partitioned data structure that stores its data in a ring-like structure. You can think of it as a circular array with a given capacity. Each Ringbuffer has a tail and a head. The tail is where the items are added and the head is where the items are overwritten or expired. You can reach each element in a Ringbuffer using a sequence ID, which is mapped to the elements between the head and tail (inclusive) of the Ringbuffer.
Getting a Ringbuffer and Reading Items
Reading from Ringbuffer is simple: get the Ringbuffer with the
HazelcastInstance getRingbuffer
method, get its current head with
the headSequence
method and start reading. Use the method readOne
to
return the item at the
given sequence; readOne
blocks if no item is available. To read the next item,
increment the sequence by one.
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
Ringbuffer<String> ringbuffer = hz.getRingbuffer("rb");
long sequence = ringbuffer.headSequence();
while(true){
String item = ringbuffer.readOne(sequence);
sequence++;
// process item
}
By exposing the sequence, you can now move the item from the Ringbuffer
as long as the item is still available. If the item is not available
any longer, StaleSequenceException
is thrown.
Adding Items to a Ringbuffer
Adding an item to a Ringbuffer is also easy with the Ringbuffer add
method:
Ringbuffer<String> ringbuffer = hz.getRingbuffer("ExampleRB");
ringbuffer.add("someitem");
Use the method add
to return the sequence of the inserted item; the
sequence value is always unique. You can use this as a
very cheap way of generating unique IDs if you are already using Ringbuffer.
IQueue vs. Ringbuffer
Hazelcast Ringbuffer can sometimes be a better alternative than an Hazelcast IQueue. Unlike IQueue, Ringbuffer does not remove the items, it only reads items using a certain position. There are many advantages to this approach as described below:
-
The same item can be read multiple times by the same thread. This is useful for realizing semantics of read-at-least-once or read-at-most-once.
-
The same item can be read by multiple threads. Normally you could use an IQueue per thread for the same semantic, but this is less efficient because of the increased remoting. A take from an IQueue is destructive, so the change needs to be applied for backup also, which is why a
queue.take()
is more expensive than aringBuffer.read(…)
. -
Reads are extremely cheap since there is no change in the Ringbuffer. Therefore no replication is required.
-
Reads and writes can be batched to speed up performance. Batching can dramatically improve the performance of Ringbuffer.
Configuring Ringbuffer Capacity
By default, a Ringbuffer is configured with a capacity
of 10000 items.
This creates an array with a size of 10000. If
a time-to-live
is configured, then an array of longs is also created that
stores the expiration time for every item.
In a lot of cases you may want to change this capacity
number to something
that better fits your needs.
Below is a declarative configuration example of a Ringbuffer with a capacity
of 2000 items.
<hazelcast>
...
<ringbuffer name="rb">
<capacity>2000</capacity>
</ringbuffer>
...
</hazelcast>
hazelcast:
ringbuffer:
rb:
capacity: 2000
Currently, Hazelcast Ringbuffer is not a partitioned data structure; its data is stored in a single partition and the replicas are stored in another partition. Therefore, create a Ringbuffer that can safely fit in a single cluster member.
Backing Up Ringbuffer
Hazelcast Ringbuffer has a single synchronous backup by default. You can control
the Ringbuffer backup just like most of the other Hazelcast
distributed data structures by setting the synchronous and asynchronous backups:
backup-count
and async-backup-count
. In the example below, a Ringbuffer is configured with no
synchronous backups and one asynchronous backup:
<hazelcast>
...
<ringbuffer name="rb">
<backup-count>0</backup-count>
<async-backup-count>1</async-backup-count>
</ringbuffer>
...
</hazelcast>
hazelcast:
ringbuffer:
rb:
backup-count: 0
async-backup-count: 1
An asynchronous backup probably gives you better performance. However, there is a chance that the item added will be lost when the member owning the primary crashes before the backup could complete. You may want to consider batching methods if you need high performance but do not want to give up on consistency.
Configuring Ringbuffer Time-To-Live
You can configure Hazelcast Ringbuffer with a time-to-live in seconds. Using this setting, you can control how long the items remain in the Ringbuffer before they are expired. By default, the time-to-live is set to 0, meaning that unless the item is overwritten, it will remain in the Ringbuffer indefinitely. If you set a time-to-live and an item is added, then, depending on the Overflow Policy, either the oldest item is overwritten, or the call is rejected.
In the example below, a Ringbuffer is configured with a time-to-live of 180 seconds.
Setting Ringbuffer Overflow Policy
Using the overflow policy, you can determine what to do if the oldest item in the Ringbuffer is not old enough to expire when more items than the configured Ringbuffer capacity are being added. The below options are currently available:
-
OverflowPolicy.OVERWRITE
: The oldest item is overwritten. -
OverflowPolicy.FAIL
: The call is aborted. The methods that make use of the OverflowPolicy return-1
to indicate that adding the item has failed.
Overflow policy gives you fine control on what to do if the Ringbuffer is full. You can also use the overflow policy to apply a back pressure mechanism. The following example code shows the usage of an exponential backoff.
Random random = new Random();
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
Ringbuffer<Long> rb = hz.getRingbuffer("rb");
long i = 100;
while (true) {
long sleepMs = 100;
for (; ; ) {
long result = rb.addAsync(i, OverflowPolicy.FAIL).toCompletableFuture().get();
if (result != -1) {
break;
}
TimeUnit.MILLISECONDS.sleep(sleepMs);
sleepMs = min(5000, sleepMs * 2);
}
// add a bit of random delay to make it look a bit more realistic
Thread.sleep(random.nextInt(10));
System.out.println("Written: " + i);
i++;
}
Ringbuffer with Persistent Datastore
Hazelcast allows you to load and store the Ringbuffer items from/to a persistent
datastore using the interface RingbufferStore
. If a Ringbuffer store is enabled,
each item added to the Ringbuffer will also be stored at the configured Ringbuffer store.
If the Ringbuffer store is configured, you can get items with sequences which are no longer in the actual Ringbuffer but are only in the Ringbuffer store. This is probably much slower but still allows you to continue consuming items from the Ringbuffer even if they are overwritten with newer items in the Ringbuffer.
When a Ringbuffer is being instantiated, it checks if the Ringbuffer store is configured and requests the latest sequence in the Ringbuffer store. This is to enable the Ringbuffer to start with sequences larger than the ones in the Ringbuffer store. In this case, the Ringbuffer is empty but you can still request older items from it (which will be loaded from the Ringbuffer store).
The Ringbuffer store stores items in the same format as the Ringbuffer. If the
BINARY
in-memory format is used, the Ringbuffer store must implement the interface
RingbufferStore<byte[]>
meaning that the Ringbuffer receives items in the binary format.
If the OBJECT
in-memory format is used, the Ringbuffer store must implement the interface
RingbufferStore<K>
, where K
is the type of item being stored (meaning that the Ringbuffer
store receives the deserialized object).
When adding items to the Ringbuffer, the method storeAll
allows you to store items in batches.
The following example class includes all of the RingbufferStore
methods.
public class TheRingbufferObjectStore implements RingbufferStore<Item> {
@Override
public void store(long sequence, Item data) {
System.out.println("Object store");
}
@Override
public void storeAll(long firstItemSequence, Item[] items) {
System.out.println("Object store all");
}
@Override
public Item load(long sequence) {
System.out.println("Object load");
return null;
}
@Override
public long getLargestSequence() {
System.out.println("Object get largest sequence");
return -1;
}
}
Item
must be serializable. The following is an example of a Ringbuffer with the
Ringbuffer store configured and enabled.
<hazelcast>
...
<ringbuffer name="default">
<capacity>10000</capacity>
<time-to-live-seconds>30</time-to-live-seconds>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
<in-memory-format>BINARY</in-memory-format>
<ringbuffer-store>
<class-name>com.hazelcast.RingbufferStoreImpl</class-name>
</ringbuffer-store>
</ringbuffer>
...
</hazelcast>
hazelcast:
ringbuffer:
default:
capacity: 10000
time-to-live-seconds: 30
backup-count: 1
async-backup-count: 0
in-memory-format: BINARY
ringbuffer-store:
class-name: com.hazelcast.RingbufferStoreImpl
The following are the explanations for the Ringbuffer store configuration elements:
-
`class-name: Name of the Ringbuffer store factory class.
Configuring Ringbuffer In-Memory Format
You can configure Hazelcast Ringbuffer with an in-memory format that controls the
format of the Ringbuffer’s stored items. By default, BINARY
in-memory format is used,
meaning that the object is stored in a serialized form. You can select the OBJECT
in-memory
format, which is useful when filtering is
applied or when the OBJECT
in-memory format has a smaller memory footprint than BINARY
.
In the declarative configuration example below, a Ringbuffer is configured with the
OBJECT
in-memory format:
Configuring Split-Brain Protection for Ringbuffer
Ringbuffer can be configured to check for a minimum number of available members before
applying Ringbuffer operations. This is a check to avoid performing successful Ringbuffer
operations on all parts of a cluster during a network partition and can be configured
using the element split-brain-protection-ref
. You should set this element’s value as the quorum’s name,
which you configured under the split-brain-protection
element as explained in the Split-Brain Protection section. Following is an example snippet:
<hazelcast>
...
<ringbuffer name="rb">
<split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
</ringbuffer>
...
</hazelcast>
hazelcast:
ringbuffer:
rb:
split-brain-protection-ref: splitbrainprotection-name
The following is a list of methods, grouped by the protection types, that support split-brain protection checks:
-
WRITE, READ_WRITE:
-
add
-
addAllAsync
-
addAsync
-
-
READ, READ_WRITE:
-
capacity
-
headSequence
-
readManyAsync
-
readOne
-
remainingCapacity
-
size
-
tailSequence
-
Adding Batched Items
In the previous examples, the method ringBuffer.add()
is used to add an item to the Ringbuffer.
The problems with this method
are that it always overwrites and that it does not support batching. Batching can have a huge
impact on the performance. You can use the method addAllAsync
to support batching.
See the following example code.
List<String> items = Arrays.asList("1","2","3");
CompletionStage<Long> s = rb.addAllAsync(items, OverflowPolicy.OVERWRITE);
// block until all items are added
s.toCompletableFuture().join();
In the above case, three strings are added to the Ringbuffer using the policy
OverflowPolicy.OVERWRITE
. See the Overflow Policy section for more information.
Reading Batched Items
In the previous example, the readOne
method read items from the Ringbuffer.
readOne
is simple but not very efficient for the following reasons:
-
readOne
does not use batching. -
readOne
cannot filter items at the source; the items need to be retrieved before being filtered.
The method readManyAsync
can read a batch of items and can filter items at the source.
See the following example code.
CompletionStage<ReadResultSet<E>> readManyAsync(
long startSequence,
int minCount,
int maxCount,
IFunction<E, Boolean> filter);
The meanings of the readManyAsync
arguments are given below:
-
startSequence
: Sequence of the first item to read. -
minCount
: Minimum number of items to read. If you do not want to block, set it to 0. If you want to block for at least one item, set it to 1. -
maxCount
: Maximum number of the items to retrieve. Its value cannot exceed 1000. -
filter
: A function that accepts an item and checks if it should be returned. If no filtering should be applied, set it tonull
.
A full example is given below.
long sequence = rb.headSequence();
for(;;) {
CompletionStage<ReadResultSet<String>> f = rb.readManyAsync(sequence, 1, 10, null);
CompletionStage<Integer> readCountStage = f.thenApplyAsync(rs -> {
for (String s : rs) {
System.out.println(s);
}
return rs.readCount();
});
sequence += readCountStage.toCompletableFuture().join();
}
Please take a careful look at how your sequence is being incremented. You cannot always rely on the number of items being returned if the items are filtered out.
There is not any filtering applied in the above example. The following example shows how you can apply a filter when reading batched items. First, let’s create our filter as shown below:
public class FruitFilter implements IFunction<String, Boolean> {
public FruitFilter() {}
public Boolean apply(String s) {
return s.startsWith("a");
}
}
So, the FruitFilter
checks whether a String object starts with the letter "a".
You can see this filter in action in the below example:
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
Ringbuffer<String> rb = hz.getRingbuffer("rb");
rb.add("apple");
rb.add("orange");
rb.add("pear");
rb.add("peach");
rb.add("avocado");
long sequence = rb.headSequence();
CompletableFuture<ReadResultSet<String>> f = rb.readManyAsync(sequence, 2, 5, new FruitFilter()).toCompletableFuture();
ReadResultSet<String> rs = f.join();
for (String s : rs) {
System.out.println(s);
}
Using Async Methods
Hazelcast Ringbuffer provides asynchronous methods for more powerful operations
like batched writing or batched reading with filtering.
To wait for the result of the operation in a blocking way, obtain a CompletableFuture
from the returned CompletionStage
by invoking CompletionStage#toCompletableFuture()
method, then
use either CompletableFuture#get()
or CompletableFuture#join()
.
See the following example code.
CompletionStage<Long> f = ringbuffer.addAsync(item, OverflowPolicy.FAIL);
f.toCompletableFuture().get();
However, you can also use CompletionStage
API to add subsequent dependent computation
stages which will be executed when the operation has completed. This way the thread used for
the call is not blocked until the response is returned.
See the below code as an example of when you want to get notified when a batch of reads has completed.
CompletionStage<ReadResultSet<String>> stage = rb.readManyAsync(sequence, min, max, someFilter);
stage.whenCompleteAsync((response, throwable) -> {
if (throwable == null) {
for (String s : response) {
System.out.println("Received:" + s);
}
} else {
throwable.printStackTrace();
}
});
Ringbuffer Configuration Examples
The following shows the declarative configuration of a Ringbuffer called rb
.
The configuration is modeled after the Ringbuffer defaults.
<hazelcast>
...
<ringbuffer name="rb">
<capacity>10000</capacity>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
<time-to-live-seconds>0</time-to-live-seconds>
<in-memory-format>BINARY</in-memory-format>
<split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
</ringbuffer>
...
</hazelcast>
hazelcast:
ringbuffer:
rb:
capacity: 10000
backup-count: 1
async-backup-count: 0
time-to-live-seconds: 0
in-memory-format: BINARY
split-brain-protection-ref: splitbrainprotection-name
You can also configure a Ringbuffer programmatically. The following is a programmatic version of the above declarative configuration.
Config config = new Config();
RingbufferConfig rbConfig = config.getRingbufferConfig("myRB");
rbConfig.setCapacity(10000)
.setBackupCount(1)
.setAsyncBackupCount(0)
.setTimeToLiveSeconds(0)
.setInMemoryFormat(InMemoryFormat.BINARY)
.setSplitBrainProtectionName("splitbrainprotectionname");