Topic
Hazelcast provides a distribution mechanism for publishing messages that are delivered to multiple subscribers. This is also known as a publish/subscribe (pub/sub) messaging model. Publishing and subscribing operations are cluster wide. When a member subscribes to a topic, it is actually registering for messages published by any member in the cluster, including the new members that joined after you add the listener.
Publish operation is async. It does not wait for operations to run in remote members; it works as fire and forget. |
Getting a Topic and Publishing Messages
Use the HazelcastInstance’s getTopic
method to get the topic, then use the topic’s
publish
method to publish your messages. The following is an example publisher:
public class TopicPublisher {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
ITopic<Date> topic = hz.getTopic("topic");
topic.publish(new Date());
}
}
And here is an example subscriber:
public class TopicSubscriber {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
ITopic<Date> topic = hz.getTopic("topic");
topic.addMessageListener(new MessageListenerImpl());
System.out.println("Subscribed");
}
private static class MessageListenerImpl implements MessageListener<Date> {
public void onMessage(Message<Date> m) {
System.out.println("Received: " + m.getMessageObject());
}
}
}
Hazelcast Topic uses the MessageListener
interface to listen for events that occur
when a message is received. See the Listening for Topic Messages section
for information on how to create a message listener class and register it.
Getting Topic Statistics
Topic has two statistic variables that you can query. These values are incremental and local to the member.
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
ITopic<Object> myTopic = hazelcastInstance.getTopic( "myTopicName" );
myTopic.getLocalTopicStats().getPublishOperationCount();
myTopic.getLocalTopicStats().getReceiveOperationCount();
getPublishOperationCount
and getReceiveOperationCount
returns the total
number of published and received messages since the start of this member, respectively.
Note that these values are not backed up, so if the member goes down, these values will be lost.
You can disable this feature with topic configuration. See the Configuring Topic section.
These statistics values can be also viewed in Management Center. See the Monitoring Topics section in Hazelcast Management Center documentation. |
Understanding Topic Behavior
Each cluster member has a list of all registrations in the cluster. When a new member is registered for a topic, it sends a registration message to all members in the cluster. Also, when a new member joins the cluster, it receives all registrations made so far in the cluster.
The behavior of a topic varies depending on the value of the configuration
parameter globalOrderEnabled
.
Ordering Messages as Published
If globalOrderEnabled
is disabled, messages are not ordered and listeners
(subscribers) process the messages in the order that the messages are published.
If cluster member M publishes messages m1, m2, m3, …, mn to a topic T,
then Hazelcast makes sure that all of the subscribers of topic T receive and
process m1, m2, m3, …, mn in the given order.
Here is how it works: Let’s say that we have three members (member1, member2 and
member3) and that member1 and member2 are registered to a topic named news
.
Note that all three members know that member1 and member2 are registered to news
.
In this example, member1 publishes two messages: a1
and a2
. Member3 publishes
two messages: c1
and c2
. When member1 and member3 publish a message, they check
their local list for registered members, discover that member1 and member2 are
in their lists, and then they fire messages to those members. One possible order of
the messages received could be the following.
member1 → c1
, a1
, a2
, c2
member2 → c1
, c2
, a1
, a2
Ordering Messages for Members
If globalOrderEnabled
is enabled, all members listening to the same topic
get its messages in the same order.
Here is how it works. Let’s say that we have three members (member1, member2 and
member3) and that member1 and member2 are registered to a topic named news
.
Note that all three members know that member1 and member2 are registered to news
.
In this example, member1 publishes two messages: a1
and a2
. Member3 publishes
two messages: c1
and c2
. When a member publishes messages over the topic news
,
it first calculates which partition the news
ID corresponds to. Then it sends an
operation to the owner of the partition for that member to publish messages. Let’s assume
that news
corresponds to a partition that member2 owns. member1 and member3 first
sends all messages to member2. Assume that the messages are published in the following order:
member1 → a1
, c1
, a2
, c2
member2 then publishes these messages by looking at registrations in its local list. It sends these messages to member1 and member2 (it makes a local dispatch for itself).
member1 → a1
, c1
, a2
, c2
member2 → a1
, c1
, a2
, c2
This way we guarantee that all members see the events in the same order.
Keeping Generated and Published Order the Same
In both cases, there is a StripedExecutor
in EventService that is responsible for
dispatching the received message. For all events in Hazelcast, the order that events
are generated and the order they are published to the user are guaranteed to be the
same via this StripedExecutor
.
In StripedExecutor
, there are as many threads as are specified in the property
hazelcast.event.thread.count
(default is five). For a specific event source (for a
particular topic name), hash of that source’s name % 5 gives the ID of the responsible
thread. Note that there can be another event source (entry listener of a map, item listener
of a collection, etc.) corresponding to the same thread. In order not to make other messages
to block, heavy processing should not be done in this thread. If there is time-consuming work
that needs to be done, the work should be handed over to another thread. See the
Getting a Topic and Publishing Messages section.
Configuring Topic
To configure a topic, set the topic name, decide on statistics and global ordering, and set the message listeners. The following are the default values:
-
global-ordering
is false, meaning that by default, there is no guarantee of global order. -
statistics
is true, meaning that by default, statistics are calculated.
You can see the example configuration snippets below.
Declarative Configuration:
<hazelcast>
...
<topic name="yourTopicName">
<global-ordering-enabled>true</global-ordering-enabled>
<statistics-enabled>true</statistics-enabled>
<message-listeners>
<message-listener>MessageListenerImpl</message-listener>
</message-listeners>
</topic>
...
</hazelcast>
hazelcast:
topic:
yourTopicName:
global-ordering-enabled: true
statistics-enabled: true
message-listeners:
- MessageListenerImpl
Programmatic Configuration:
TopicConfig topicConfig = new TopicConfig();
topicConfig.setGlobalOrderingEnabled( true );
topicConfig.setStatisticsEnabled( true );
topicConfig.setName( "yourTopicName" );
MessageListener<String> implementation = new MessageListener<String>() {
@Override
public void onMessage( Message<String> message ) {
// process the message
}
};
topicConfig.addMessageListenerConfig( new ListenerConfig( implementation ) );
HazelcastInstance instance = Hazelcast.newHazelcastInstance();
Topic configuration has the following elements:
-
statistics-enabled
: Specifies whether the statistics gathering is enabled for your topic. If set tofalse
, you cannot collect statistics in your implementation (usinggetLocalTopicStats()
) and also Hazelcast Management Center will not show them. Its default value istrue
. -
global-ordering-enabled
: Default isfalse
, meaning there is no global order guarantee. -
message-listeners
: Lets you add listeners (listener classes) for the topic messages.
Besides the above elements, there are the following system properties that are topic related but not topic specific:
-
hazelcast.event.queue.capacity
with a default value of 1,000,000 -
hazelcast.event.queue.timeout.millis
with a default value of 250 -
hazelcast.event.thread.count
with a default value of 5
For the descriptions of these parameters, see the Global Event Configuration section.