Queue
Hazelcast distributed queue is an implementation of java.util.concurrent.BlockingQueue
.
Being distributed, Hazelcast distributed queue enables all cluster members to interact with it.
Using Hazelcast distributed queue, you can add an item in one cluster member and remove it from another one.
Getting a Queue and Putting Items
Use the Hazelcast instance’s getQueue
method to get the queue, then use the queue’s
put
method to put items into the queue.
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
BlockingQueue<MyTask> queue = hazelcastInstance.getQueue( "tasks" );
queue.put( new MyTask() );
MyTask task = queue.take();
boolean offered = queue.offer( new MyTask(), 10, TimeUnit.SECONDS );
task = queue.poll( 5, TimeUnit.SECONDS );
if ( task != null ) {
//process task
}
FIFO ordering applies to all queue operations across the cluster. The user objects
(such as MyTask
in the example above) that are enqueued or dequeued have to be Serializable
.
Hazelcast distributed queue performs no batching while iterating over the queue. All items are copied locally and iteration occurs locally.
Hazelcast distributed queue uses ItemListener
to listen to the events that occur
when items are added to and removed from the queue. See the Listening for Item Events section for information on how to create an item listener
class and register it.
Creating an Example Queue
The following example code illustrates a distributed queue that connects a producer and consumer.
Putting Items on the Queue
Let’s put
one integer on the queue every second, 100 integers total.
public class ProducerMember {
public static void main( String[] args ) throws Exception {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
IQueue<Integer> queue = hz.getQueue( "queue" );
for ( int k = 1; k < 100; k++ ) {
queue.put( k );
System.out.println( "Producing: " + k );
Thread.sleep(1000);
}
queue.put( -1 );
System.out.println( "Producer Finished!" );
}
}
Producer
puts a -1 on the queue to show that the put
s are finished.
Taking Items off the Queue
Now, let’s create a Consumer
class to take
a message from this queue, as shown below.
public class ConsumerMember {
public static void main( String[] args ) throws Exception {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
IQueue<Integer> queue = hz.getQueue( "queue" );
while ( true ) {
int item = queue.take();
System.out.println( "Consumed: " + item );
if ( item == -1 ) {
queue.put( -1 );
break;
}
Thread.sleep( 5000 );
}
System.out.println( "Consumer Finished!" );
}
}
As seen in the above example code, Consumer
waits five seconds before it consumes
the next message. It stops once it receives -1. Also note that Consumer
puts -1 back on the queue before the loop is ended.
When you first start Producer
and then start Consumer
, items produced on the
queue will be consumed from the same queue.
Balancing the Queue Operations
From the above example code, you can see that an item is produced every second and consumed every five seconds. Therefore, the consumer keeps growing. To balance the produce/consume operation, let’s start another consumer. This way, consumption is distributed to these two consumers, as seen in the example outputs below.
The second consumer is started. After a while, here is the first consumer output:
...
Consumed 13
Consumed 15
Consumer 17
...
Here is the second consumer output:
...
Consumed 14
Consumed 16
Consumer 18
...
In the case of a lot of producers and consumers for the queue, using a list of queues may solve the queue bottlenecks. In this case, be aware that the order of the messages sent to different queues is not guaranteed. Since in most cases strict ordering is not important, a list of queues is a good solution.
The items are taken from the queue in the same order they were put on the queue. However, if there is more than one consumer, this order is not guaranteed. |
ItemIDs When Offering Items
Hazelcast gives an itemId
for each item you offer, which is an incrementing sequence
identification for the queue items. You should consider the following to understand the
itemId
assignment behavior:
-
When a Hazelcast member has a queue and that queue is configured to have at least one backup, and that member is restarted, the
itemId
assignment resumes from the last known highestitemId
before the restart;itemId
assignment does not start from the beginning for the new items. -
When the whole cluster is restarted, the same behavior explained in the above consideration applies if your queue has a persistent data store (
QueueStore
). If the queue hasQueueStore
, theitemId
for the new items are given, starting from the highestitemId
found in the IDs returned by the methodloadAllKeys
. If the methodloadAllKeys
does not return anything, theitemId
s starts from the beginning after a cluster restart. -
The above two considerations mean there are no duplicated
itemId
s in the memory or in the persistent data store.
Setting a Bounded Queue
A bounded queue is a queue with a limited capacity. When the bounded queue is full, no more items can be put into the queue until some items are taken out.
To turn a Hazelcast distributed queue into a bounded queue, set the capacity limit
with the max-size
property. You can set the max-size
property in the configuration,
as shown below. The max-size
element specifies the maximum size of the queue.
Once the queue size reaches this value, put
operations are blocked until the
queue size goes below max-size
, which happens when a consumer removes items from the queue.
Let’s set 10 as the maximum size of our example queue in Creating an Example Queue.
<hazelcast>
...
<queue name="queue">
<max-size>10</max-size>
</queue>
...
</hazelcast>
hazelcast:
queue:
queue:
max-size: 10
When the producer is started, ten items are put into the queue and then the queue
will not allow more put
operations. When the consumer is started, it will remove
items from the queue. This means that the producer can put
more items into the
queue until there are ten items in the queue again, at which point the put
operation
again becomes blocked.
In this example code, the producer is five times faster than the consumer. It will effectively always be waiting for the consumer to remove items before it can put more on the queue. For this example code, if maximum throughput is the goal, it would be a good option to start multiple consumers to prevent the queue from filling up.
Queueing with Persistent Datastore
Hazelcast allows you to load and store the distributed queue items from/to a persistent
datastore using the interface QueueStore
. If queue store is enabled, each item added to
the queue is also stored at the configured queue store. When the number of items in the
queue exceeds the memory limit, the subsequent items are persisted in the queue store,
they are not stored in the queue memory.
The QueueStore
interface enables you to store, load and delete queue items with methods like
store
, storeAll
, load
and delete
. The following example class includes all of the QueueStore
methods.
public class TheQueueStore implements QueueStore<Item> {
@Override
public void delete(Long key) {
System.out.println("delete");
}
@Override
public void store(Long key, Item value) {
System.out.println("store");
}
@Override
public void storeAll(Map<Long, Item> map) {
System.out.println("store all");
}
@Override
public void deleteAll(Collection<Long> keys) {
System.out.println("deleteAll");
}
@Override
public Item load(Long key) {
System.out.println("load");
return null;
}
@Override
public Map<Long, Item> loadAll(Collection<Long> keys) {
System.out.println("loadALl");
return null;
}
@Override
public Set<Long> loadAllKeys() {
System.out.println("loadAllKeys");
return null;
}
}
Item
must be serializable. The following is an example queue store configuration.
<hazelcast>
...
<queue name="queue">
<max-size>10</max-size>
<queue-store>
<class-name>com.hazelcast.QueueStoreImpl</class-name>
<properties>
<property name="binary">false</property>
<property name="memory-limit">1000</property>
<property name="bulk-load">500</property>
</properties>
</queue-store>
</queue>
...
</hazelcast>
hazelcast:
queue:
queue:
max-size: 10
queue-store:
class-name: com.hazelcast.QueueStoreImpl
properties:
binary: false
memory-limit: 1000
bulk-load: 500
The following are the descriptions for each queue store property:
-
Binary: By default, Hazelcast stores the queue items in serialized form, and before it inserts the queue items into the queue store, it deserializes them. If you are not reaching the queue store from an external application, you might prefer that the items be inserted in binary form. Do this by setting the
binary
property to true: then you can get rid of the deserialization step, which is a performance optimization. Thebinary
property is false by default. -
Memory Limit: This is the number of items after which Hazelcast stores items only to the datastore. For example, if the memory limit is 1000, then the 1001st item is put only to the datastore. This feature is useful when you want to avoid out-of-memory conditions. If you want to always use memory, you can set it to
Integer.MAX_VALUE
. The default number formemory-limit
is 1000. -
Bulk Load: When the queue is initialized, items are loaded from
QueueStore
in bulks. Bulk load is the size of these bulks. The default value ofbulk-load
is 250.
Split-Brain Protection for Queue
Queues can be configured to check for a minimum number of available members before applying queue operations (see the Split-Brain Protection section). This is a check to avoid performing successful queue operations on all parts of a cluster during a network partition.
The following is a list of methods, grouped by the protection types, that support split-brain protection checks:
-
WRITE, READ_WRITE
-
Collection.addAll()
-
Collection.removeAll()
,Collection.retainAll()
-
BlockingQueue.offer()
,BlockingQueue.add()
,BlockingQueue.put()
-
BlockingQueue.drainTo()
-
IQueue.poll()
,Queue.remove()
,IQueue.take()
-
BlockingQueue.remove()
-
-
READ, READ_WRITE
-
Collection.clear()
-
Collection.containsAll()
,BlockingQueue.contains()
-
Collection.isEmpty()
-
Collection.iterator()
,Collection.toArray()
-
Queue.peek()
,Queue.element()
-
Collection.size()
-
BlockingQueue.remainingCapacity()
-
Configuring Queue
The following are examples of queue configurations. It includes the
QueueStore
configuration, which is explained in the Queueing with Persistent Datastore section.
Declarative Configuration:
<hazelcast>
...
<queue name="default">
<max-size>0</max-size>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
<empty-queue-ttl>-1</empty-queue-ttl>
<item-listeners>
<item-listener>com.hazelcast.examples.ItemListener</item-listener>
</item-listeners>
<statistics-enabled>true</statistics-enabled>
<queue-store>
<class-name>com.hazelcast.QueueStoreImpl</class-name>
<properties>
<property name="binary">false</property>
<property name="memory-limit">10000</property>
<property name="bulk-load">500</property>
</properties>
</queue-store>
<split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
</queue>
...
</hazelcast>
hazelcast:
queue:
default:
max-size: 0
backup-count: 1
async-backup-count: 0
empty-queue-ttl: -1
item-listeners:
- include-value: true
class-name: com.hazelcast.examples.ItemListener
statistics-enabled: true
queue-store:
class-name: com.hazelcast.QueueStoreImpl
properties:
binary: false
memory-limit: 1000
bulk-load: 500
split-brain-protection-ref: splitbrainprotection-name
Programmatic Configuration:
Config config = new Config();
QueueConfig queueConfig = config.getQueueConfig("default");
queueConfig.setName("MyQueue")
.setBackupCount(1)
.setMaxSize(0)
.setStatisticsEnabled(true)
.setSplitBrainProtectionName("splitbrainprotectionname");
queueConfig.getQueueStoreConfig()
.setEnabled(true)
.setClassName("com.hazelcast.QueueStoreImpl")
.setProperty("binary", "false");
config.addQueueConfig(queueConfig);
Hazelcast distributed queue has one synchronous backup by default.
By having this backup, when a cluster member with a queue goes down,
another member having the backup of that queue will continue. Therefore,
no items are lost. You can define the number of synchronous backups for a
queue using the backup-count
element in the declarative configuration.
A queue can also have asynchronous backups: you can define the number of
asynchronous backups using the async-backup-count
element.
To set the maximum size of the queue, use the max-size
element.
To purge unused or empty queues after a period of time, use the empty-queue-ttl
element.
If you define a value (time in seconds) for the empty-queue-ttl
element,
then your queue will be destroyed if it stays empty or unused for the time in seconds that you give.
The following is the full list of queue configuration elements with their descriptions:
-
max-size
: Maximum number of items in the queue. It is used to set an upper bound for the queue. You will not be able to put more items when the queue reaches to this maximum size whether you have a queue store configured or not. -
backup-count
: Number of synchronous backups. Queue is a non-partitioned data structure, so all entries of a queue reside in one partition. When this parameter is '1', it means there will be one backup of that queue in another member in the cluster. When it is '2', two members will have the backup. -
async-backup-count
: Number of asynchronous backups. -
empty-queue-ttl
: Used to purge unused or empty queues. If you define a value (time in seconds) for this element, then your queue will be destroyed if it stays empty or unused for that time. -
item-listeners
: Adds listeners (listener classes) for the queue items. You can also set the attributeinclude-value
totrue
if you want the item event to contain the item values. You can setlocal
totrue
if you want to listen to the items on the local member. -
queue-store
: Includes the queue store factory class name and the properties binary, memory limit and bulk load. See the Queueing with Persistent Datastore section. -
statistics-enabled
: Specifies whether the statistics gathering is enabled for your queue. If set tofalse
, you cannot collect statistics in your implementation (usinggetLocalQueueStats()
) and also Hazelcast Management Center will not show them. Its default value istrue
. -
split-brain-protection-ref
: Name of the split-brain protection configuration that you want this queue to use.