Reliable Topic

Reliable Topic uses the same ITopic interface as a regular topic. The main difference is that Reliable Topic is backed up by the Ringbuffer data structure. The following are the advantages of this approach:

  • Events are not lost since the Ringbuffer is configured with one synchronous backup by default.

  • Each Reliable ITopic gets its own Ringbuffer; if a topic has a very fast producer, it will not lead to problems at topics that run at a slower pace.

  • Since the event system behind a regular ITopic is shared with other data structures, e.g., collection listeners, you can run into isolation problems. This does not happen with the Reliable ITopic.

Here is an example for a publisher using Reliable Topic:

public class PublisherMember {
    public static void main(String[] args) {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        Random random = new Random();
        ITopic<Long> topic = hz.getReliableTopic("sometopic");
        long messageId = 0;

        while (true) {
            topic.publish(messageId);
            messageId++;
            System.out.println("Written: " + messageId);
            sleepMillis(random.nextInt(100));
        }
    }
    public static boolean sleepMillis(int millis) {
        try {
            MILLISECONDS.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
    }
}

And the following is an example for the subscriber:

public class SubscribedMember {

    public static void main(String[] args) {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        ITopic<Long> topic = hz.getReliableTopic("sometopic");
        topic.addMessageListener(new MessageListenerImpl());
    }

    private static class MessageListenerImpl implements MessageListener<Long> {
        public void onMessage(Message<Long> m) {
            System.out.println("Received: " + m.getMessageObject());
        }
    }
}

When you create a Reliable Topic, Hazelcast automatically creates a Ringbuffer for it. You may configure this Ringbuffer by adding a Ringbuffer config with the same name as the Reliable Topic. For instance, if you have a Reliable Topic with the name "sometopic", you should add a Ringbuffer config with the name "sometopic" to configure the backing Ringbuffer. Some of the things that you may configure are the capacity, the time-to-live for the topic messages, and you can even add a Ringbuffer store which allows you to have a persistent topic. By default, a Ringbuffer does not have any TTL (time-to-live) and it has a limited capacity; you may want to change that configuration. The following is an example configuration for the "sometopic" given above.

  • XML

  • YAML

<hazelcast>
    ...
    <!-- This is the ringbuffer that is used by the 'sometopic' Reliable-topic. As you can see the
         ringbuffer has the same name as the topic. -->
    <ringbuffer name="sometopic">
        <capacity>1000</capacity>
        <time-to-live-seconds>5</time-to-live-seconds>
    </ringbuffer>
    <reliable-topic name="sometopic">
        <topic-overload-policy>BLOCK</topic-overload-policy>
    </reliable-topic>
    ...
</hazelcast>
hazelcast:
  ringbuffer:
    sometopic:
      capacity: 1000
      time-to-live-seconds: 5
  reliable-topic:
    sometopic:
      topic-overload-policy: BLOCK

See the Configuring Reliable Topic section below for the descriptions of all Reliable Topic configuration elements.

By default, the Reliable ITopic uses a shared thread pool. If you need a better isolation, you can configure a custom executor on the ReliableTopicConfig.

Because the reads on a Ringbuffer are not destructive, batching is easy to apply. ITopic uses read batching and reads ten items at a time (if available) by default. See Reading Batched Items. for more information.

Slow Consumers

The Reliable ITopic provides control and a way to deal with slow consumers. It is unwise to keep events for a slow consumer in memory indefinitely since you do not know when the slow consumer is going to catch up. You can control the size of the Ringbuffer by using its capacity. For the cases when a Ringbuffer runs out of its capacity, you can specify the following policies for the TopicOverloadPolicy configuration:

  • DISCARD_OLDEST: Overwrite the oldest item, even if a TTL is set. In this case the fast producer supersedes a slow consumer.

  • DISCARD_NEWEST: Discard the newest item.

  • BLOCK: Wait until the items are expired in the Ringbuffer.

  • ERROR: Immediately throw TopicOverloadException if there is no space in the Ringbuffer.

Configuring Reliable Topic

The following are example Reliable Topic configurations.

Declarative Configuration:

  • XML

  • YAML

<hazelcast>
    ...
    <reliable-topic name="default">
        <statistics-enabled>true</statistics-enabled>
        <message-listeners>
            <message-listener>
                ...
            </message-listener>
        </message-listeners>
        <read-batch-size>10</read-batch-size>
        <topic-overload-policy>BLOCK</topic-overload-policy>
    </reliable-topic>
    ...
</hazelcast>
hazelcast:
  reliable-topic:
    default:
      statistics-enabled: true
      message-listeners:
        - ...
      read-batch-size: 10
      topic-overload-policy: BLOCK

Programmatic Configuration:

Config config = new Config();
ReliableTopicConfig rtConfig = config.getReliableTopicConfig( "default" );
rtConfig.setTopicOverloadPolicy( TopicOverloadPolicy.BLOCK )
    .setReadBatchSize( 10 )
    .setStatisticsEnabled( true );

Reliable Topic configuration has the following elements:

  • statistics-enabled: Specifies whether the statistics gathering is enabled for your Reliable Topic. If set to false, you cannot collect statistics in your implementation and also Hazelcast Management Center will not show them. Its default value is true.

  • message-listener: Message listener class that listens to the messages when they are added or removed.

  • read-batch-size: Minimum number of messages that Reliable Topic tries to read in batches. Its default value is 10.

  • topic-overload-policy: Policy to handle an overloaded topic. Available values are DISCARD_OLDEST, DISCARD_NEWEST, BLOCK and ERROR. The default value is BLOCK. See Slow Consumers for definitions of these policies.