A newer version of IMDG is available.

View latest

Want to try Hazelcast Platform?

We’ve combined the in-memory storage of IMDG with the stream processing power of Jet to bring you the all new Hazelcast Platform.

Topic

Hazelcast provides a distribution mechanism for publishing messages that are delivered to multiple subscribers. This is also known as a publish/subscribe (pub/sub) messaging model. Publishing and subscribing operations are cluster wide. When a member subscribes to a topic, it is actually registering for messages published by any member in the cluster, including the new members that joined after you add the listener.

Publish operation is async. It does not wait for operations to run in remote members; it works as fire and forget.

Getting a Topic and Publishing Messages

Use the HazelcastInstance’s getTopic method to get the topic, then use the topic’s publish method to publish your messages. The following is an example publisher:

public class TopicPublisher {

    public static void main(String[] args) {

        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        ITopic<Date> topic = hz.getTopic("topic");
        topic.publish(new Date());
    }
}

And here is an example subscriber:

public class TopicSubscriber {

    public static void main(String[] args) {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        ITopic<Date> topic = hz.getTopic("topic");
        topic.addMessageListener(new MessageListenerImpl());
        System.out.println("Subscribed");
    }

    private static class MessageListenerImpl implements MessageListener<Date> {
        public void onMessage(Message<Date> m) {
            System.out.println("Received: " + m.getMessageObject());
        }
    }
}

Hazelcast Topic uses the MessageListener interface to listen for events that occur when a message is received. See the Listening for Topic Messages section for information on how to create a message listener class and register it.

Getting Topic Statistics

Topic has two statistic variables that you can query. These values are incremental and local to the member.

        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
        ITopic<Object> myTopic = hazelcastInstance.getTopic( "myTopicName" );

        myTopic.getLocalTopicStats().getPublishOperationCount();
        myTopic.getLocalTopicStats().getReceiveOperationCount();

getPublishOperationCount and getReceiveOperationCount returns the total number of published and received messages since the start of this member, respectively. Note that these values are not backed up, so if the member goes down, these values will be lost.

You can disable this feature with topic configuration. See the Configuring Topic section.

These statistics values can be also viewed in Management Center. See the Monitoring Topics section in Hazelcast Management Center documentation.

Understanding Topic Behavior

Each cluster member has a list of all registrations in the cluster. When a new member is registered for a topic, it sends a registration message to all members in the cluster. Also, when a new member joins the cluster, it receives all registrations made so far in the cluster.

The behavior of a topic varies depending on the value of the configuration parameter globalOrderEnabled.

Ordering Messages as Published

If globalOrderEnabled is disabled, messages are not ordered and listeners (subscribers) process the messages in the order that the messages are published. If cluster member M publishes messages m1, m2, m3, …​, mn to a topic T, then Hazelcast makes sure that all of the subscribers of topic T receive and process m1, m2, m3, …​, mn in the given order.

Here is how it works: Let’s say that we have three members (member1, member2 and member3) and that member1 and member2 are registered to a topic named news. Note that all three members know that member1 and member2 are registered to news.

In this example, member1 publishes two messages: a1 and a2. Member3 publishes two messages: c1 and c2. When member1 and member3 publish a message, they check their local list for registered members, discover that member1 and member2 are in their lists, and then they fire messages to those members. One possible order of the messages received could be the following.

member1c1, a1, a2, c2

member2c1, c2, a1, a2

Ordering Messages for Members

If globalOrderEnabled is enabled, all members listening to the same topic get its messages in the same order.

Here is how it works. Let’s say that we have three members (member1, member2 and member3) and that member1 and member2 are registered to a topic named news. Note that all three members know that member1 and member2 are registered to news.

In this example, member1 publishes two messages: a1 and a2. Member3 publishes two messages: c1 and c2. When a member publishes messages over the topic news, it first calculates which partition the news ID corresponds to. Then it sends an operation to the owner of the partition for that member to publish messages. Let’s assume that news corresponds to a partition that member2 owns. member1 and member3 first sends all messages to member2. Assume that the messages are published in the following order:

member1a1, c1, a2, c2

member2 then publishes these messages by looking at registrations in its local list. It sends these messages to member1 and member2 (it makes a local dispatch for itself).

member1a1, c1, a2, c2

member2a1, c1, a2, c2

This way we guarantee that all members see the events in the same order.

Keeping Generated and Published Order the Same

In both cases, there is a StripedExecutor in EventService that is responsible for dispatching the received message. For all events in Hazelcast, the order that events are generated and the order they are published to the user are guaranteed to be the same via this StripedExecutor.

In StripedExecutor, there are as many threads as are specified in the property hazelcast.event.thread.count (default is five). For a specific event source (for a particular topic name), hash of that source’s name % 5 gives the ID of the responsible thread. Note that there can be another event source (entry listener of a map, item listener of a collection, etc.) corresponding to the same thread. In order not to make other messages to block, heavy processing should not be done in this thread. If there is time-consuming work that needs to be done, the work should be handed over to another thread. See the Getting a Topic and Publishing Messages section.

Configuring Topic

To configure a topic, set the topic name, decide on statistics and global ordering, and set the message listeners. The following are the default values:

  • global-ordering is false, meaning that by default, there is no guarantee of global order.

  • statistics is true, meaning that by default, statistics are calculated.

You can see the example configuration snippets below.

Declarative Configuration:

  • XML

  • YAML

<hazelcast>
    ...
    <topic name="yourTopicName">
        <global-ordering-enabled>true</global-ordering-enabled>
        <statistics-enabled>true</statistics-enabled>
        <message-listeners>
            <message-listener>MessageListenerImpl</message-listener>
        </message-listeners>
    </topic>
    ...
</hazelcast>
hazelcast:
  topic:
    yourTopicName:
      global-ordering-enabled: true
      statistics-enabled: true
      message-listeners:
        - MessageListenerImpl

Programmatic Configuration:

        TopicConfig topicConfig = new TopicConfig();
        topicConfig.setGlobalOrderingEnabled( true );
        topicConfig.setStatisticsEnabled( true );
        topicConfig.setName( "yourTopicName" );
        MessageListener<String> implementation = new MessageListener<String>() {
            @Override
            public void onMessage( Message<String> message ) {
                // process the message
            }
        };
        topicConfig.addMessageListenerConfig( new ListenerConfig( implementation ) );
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();

Topic configuration has the following elements:

  • statistics-enabled: Specifies whether the statistics gathering is enabled for your topic. If set to false, you cannot collect statistics in your implementation (using getLocalTopicStats()) and also Hazelcast Management Center will not show them. Its default value is true.

  • global-ordering-enabled: Default is false, meaning there is no global order guarantee.

  • message-listeners: Lets you add listeners (listener classes) for the topic messages.

Besides the above elements, there are the following system properties that are topic related but not topic specific:

  • hazelcast.event.queue.capacity with a default value of 1,000,000

  • hazelcast.event.queue.timeout.millis with a default value of 250

  • hazelcast.event.thread.count with a default value of 5

For the descriptions of these parameters, see the Global Event Configuration section.