Reliable Topic
Reliable Topic uses the same ITopic
interface
as a regular topic. The main difference is that Reliable Topic
is backed up by the Ringbuffer data structure. The following are the advantages of this approach:
-
Events are not lost since the Ringbuffer is configured with one synchronous backup by default.
-
Each Reliable
ITopic
gets its own Ringbuffer; if a topic has a very fast producer, it will not lead to problems at topics that run at a slower pace. -
Since the event system behind a regular
ITopic
is shared with other data structures, e.g., collection listeners, you can run into isolation problems. This does not happen with the ReliableITopic
.
Here is an example for a publisher using Reliable Topic:
public class PublisherMember {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
Random random = new Random();
ITopic<Long> topic = hz.getReliableTopic("sometopic");
long messageId = 0;
while (true) {
topic.publish(messageId);
messageId++;
System.out.println("Written: " + messageId);
sleepMillis(random.nextInt(100));
}
}
public static boolean sleepMillis(int millis) {
try {
MILLISECONDS.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return true;
}
}
And the following is an example for the subscriber:
public class SubscribedMember {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
ITopic<Long> topic = hz.getReliableTopic("sometopic");
topic.addMessageListener(new MessageListenerImpl());
}
private static class MessageListenerImpl implements MessageListener<Long> {
public void onMessage(Message<Long> m) {
System.out.println("Received: " + m.getMessageObject());
}
}
}
When you create a Reliable Topic, Hazelcast automatically creates a Ringbuffer for it. You may configure this Ringbuffer by adding a Ringbuffer config with the same name as the Reliable Topic. For instance, if you have a Reliable Topic with the name "sometopic", you should add a Ringbuffer config with the name "sometopic" to configure the backing Ringbuffer. Some of the things that you may configure are the capacity, the time-to-live for the topic messages, and you can even add a Ringbuffer store which allows you to have a persistent topic. By default, a Ringbuffer does not have any TTL (time-to-live) and it has a limited capacity; you may want to change that configuration. The following is an example configuration for the "sometopic" given above.
<hazelcast>
...
<!-- This is the ringbuffer that is used by the 'sometopic' Reliable-topic. As you can see the
ringbuffer has the same name as the topic. -->
<ringbuffer name="sometopic">
<capacity>1000</capacity>
<time-to-live-seconds>5</time-to-live-seconds>
</ringbuffer>
<reliable-topic name="sometopic">
<topic-overload-policy>BLOCK</topic-overload-policy>
</reliable-topic>
...
</hazelcast>
hazelcast:
ringbuffer:
sometopic:
capacity: 1000
time-to-live-seconds: 5
reliable-topic:
sometopic:
topic-overload-policy: BLOCK
See the Configuring Reliable Topic section below for the descriptions of all Reliable Topic configuration elements.
By default, the Reliable ITopic
uses a shared thread pool. If you need a
better isolation, you can configure a custom executor on the
ReliableTopicConfig
.
Because the reads on a Ringbuffer are not destructive, batching is easy to apply.
ITopic
uses read batching and reads
ten items at a time (if available) by default. See Reading Batched Items.
for more information.
Slow Consumers
The Reliable ITopic
provides control and a way to deal with slow consumers.
It is unwise to keep events for a slow consumer in memory
indefinitely since you do not know when the slow consumer is going to catch up.
You can control the size of the Ringbuffer by using its capacity. For the cases
when a Ringbuffer runs out of its capacity, you can specify the following policies
for the TopicOverloadPolicy
configuration:
-
DISCARD_OLDEST
: Overwrite the oldest item, even if a TTL is set. In this case the fast producer supersedes a slow consumer. -
DISCARD_NEWEST
: Discard the newest item. -
BLOCK
: Wait until the items are expired in the Ringbuffer. -
ERROR
: Immediately throwTopicOverloadException
if there is no space in the Ringbuffer.
Configuring Reliable Topic
The following are example Reliable Topic configurations.
Declarative Configuration:
<hazelcast>
...
<reliable-topic name="default">
<statistics-enabled>true</statistics-enabled>
<message-listeners>
<message-listener>
...
</message-listener>
</message-listeners>
<read-batch-size>10</read-batch-size>
<topic-overload-policy>BLOCK</topic-overload-policy>
</reliable-topic>
...
</hazelcast>
hazelcast:
reliable-topic:
default:
statistics-enabled: true
message-listeners:
- ...
read-batch-size: 10
topic-overload-policy: BLOCK
Programmatic Configuration:
Config config = new Config();
ReliableTopicConfig rtConfig = config.getReliableTopicConfig( "default" );
rtConfig.setTopicOverloadPolicy( TopicOverloadPolicy.BLOCK )
.setReadBatchSize( 10 )
.setStatisticsEnabled( true );
Reliable Topic configuration has the following elements:
-
statistics-enabled
: Specifies whether the statistics gathering is enabled for your Reliable Topic. If set tofalse
, you cannot collect statistics in your implementation and also Hazelcast Management Center will not show them. Its default value istrue
. -
message-listener
: Message listener class that listens to the messages when they are added or removed. -
read-batch-size
: Minimum number of messages that Reliable Topic tries to read in batches. Its default value is 10. -
topic-overload-policy
: Policy to handle an overloaded topic. Available values areDISCARD_OLDEST
,DISCARD_NEWEST
,BLOCK
andERROR
. The default value isBLOCK
. See Slow Consumers for definitions of these policies.