Queue
Hazelcast distributed queue enables all cluster members and clients to interact with it; you can add an item in one cluster member and remove it from another member or a client.
FIFO ordering applies to all queue operations across the cluster. The user objects
that are enqueued or dequeued have to be Serializable
.
Hazelcast distributed queue performs no batching while iterating over the queue. All items are copied locally and iteration occurs locally.
Creating a Queue
When you start a Hazelcast member with default configuration, it will have an empty Queue named default
.
See Start a Local Cluster in Docker for a quick cluster startup.
You can either use this default
Queue or create your own using the Queue’s getter methods as shown in the below
examples. If you pass a name other than default
as the Queue name in these methods, it creates a new Queue with
the given name; otherwise, it will use this existing Queue.
The following examples illustrate a distributed queue that connects a producer and consumer.
Putting Items on the Queue
The following creates a queue producer which adds one integer on the queue every second, 100 integers total.
-
Add the following to your file:
public class ProducerMember { public static void main( String[] args ) throws Exception { HazelcastInstance hz = Hazelcast.newHazelcastInstance(); IQueue<Integer> queue = hz.getQueue( "queue" ); (1) (2) for ( int k = 1; k < 100; k++ ) { queue.put( k ); System.out.println( "Producing: " + k ); Thread.sleep(1000); } queue.put( -1 ); System.out.println( "Producer Finished!" ); } }
-
Add the following to your file:
#include <hazelcast/client/hazelcast_client.h> int main() { auto hz = hazelcast::new_client().get(); auto queue = hz.get_queue("queue").get(); (1) (2) for (int k = 1; k < 100; k++) { queue->put(k).get(); std::cout << "Producing: " << k << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); } queue->put(-1).get(); std::cout << "Producer Finished!" << std::endl; std::cout << "Finished" << std::endl; return 0; }
-
Add the following to your file:
using System; using System.Threading.Tasks; namespace Hazelcast.Examples.DistributedObjects { public class QueueExample { public static async Task Main(string[] args) { var options = new HazelcastOptionsBuilder() .With(args) .WithConsoleLogger() .Build(); await using var client = await HazelcastClientFactory.StartNewClientAsync(options); await using var queue = await client.GetQueueAsync<string>("queue"); (1) (2) var producer = Task.Run(async () => { for (var i = 0; i < 100; i++) { await queue.OfferAsync("value " + i); } Console.WriteLine("produced"); }); } } }
-
Install the Node.js client library.
npm install hazelcast-client
-
Add the following to your file:
const client = await Client.newHazelcastClient(); const queue = await client.getQueue('queue'); for (let k = 0; k < 100; k++) { await queue.put(k); console.log('Producing: ' + k); await new Promise((resolve) => { setTimeout(resolve, 1000); }); } await queue.put(-1);
-
Install the Python client library.
pip install hazelcast-python-client
-
Add the following to your file:
import hazelcast import threading client = hazelcast.HazelcastClient() queue = client.get_queue("queue") (1) (2) def produce(): for i in range(100): queue.offer("value-" + str(i)) producer_thread = threading.Thread(target=produce) producer_thread.start() producer_thread.join() client.shutdown()
-
Install the Go client library.
go get github.com/hazelcast/hazelcast-go-client
-
Add the following to your file:
package main import ( "context" "fmt" "time" "github.com/hazelcast/hazelcast-go-client" ) func main() { ctx := context.Background() hz, err := hazelcast.StartNewClient(ctx) if err != nil { panic(fmt.Errorf("starting hazelcast client %w", err)) } queue, err := hz.GetQueue(ctx, "queue") (1) if err != nil { panic(fmt.Errorf("getting queue instance %w", err)) } (2) for i := 1; i < 100; i++ { err := queue.Put(ctx, int64(i)) fmt.Println("Producing: ", int64(i)) if err != nil { panic(fmt.Errorf("putting item to queue %w", err)) } time.Sleep(time.Second) } err = queue.Put(ctx, int64(-1)) if err != nil { panic(fmt.Errorf("putting item to queue %w", err)) } fmt.Println("Producer Finished!") }
1 | Create the Queue called queue . |
2 | Add items to queue . |
This producer puts a -1 on the queue to show that adding items operation is finished.
Taking Items off the Queue
The following creates a queue consumer to take messages from this queue.
-
Add the following to your file:
public class ConsumerMember { public static void main( String[] args ) throws Exception { HazelcastInstance hz = Hazelcast.newHazelcastInstance(); IQueue<Integer> queue = hz.getQueue( "queue" ); (1) while ( true ) { int item = queue.take(); (2) System.out.println( "Consumed: " + item ); if ( item == -1 ) { queue.put( -1 ); break; } Thread.sleep( 5000 ); (3) } System.out.println( "Consumer Finished!" ); } }
1 Access the queue. 2 Start taking queue messages. 3 Wait for 5 seconds until the next message is taken.
-
Add the following to your file:
#include <hazelcast/client/hazelcast_client.h> int main() { auto hz = hazelcast::new_client().get(); auto queue = hz.get_queue("queue").get(); (1) while (true) { auto item = queue->take<int32_t>().get(); (2) if (item) { std::cout << "Consumed: " << item.value() << std::endl; if (item.value() == -1) { queue->put(-1).get(); break; } } else { std::cout << "Retrieved item is null." << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(5)); (3) } std::cout << "Consumer Finished!" << std::endl; std::cout << "Finished" << std::endl; return 0; }
1 Access the queue. 2 Start taking queue messages. 3 Wait for 5 seconds until the next message is taken.
-
Add the following to your file:
using System; using System.Threading.Tasks; namespace Hazelcast.Examples.DistributedObjects { public class QueueExample { public static async Task Main(string[] args) { var options = new HazelcastOptionsBuilder() .With(args) .WithConsoleLogger() .Build(); await using var client = await HazelcastClientFactory.StartNewClientAsync(options); await using var queue = await client.GetQueueAsync<string>("queue"); (1) var consumer = Task.Run(async () => { var nConsumed = 0; string e; while (nConsumed++ < 100 && (e = await queue.PollAsync()) != null) (2) { Console.WriteLine("Consuming " + e); } Console.WriteLine("consumed"); }); await Task.WhenAll(producer, consumer); await client.DestroyAsync(queue); } } }
1 Access the queue. 2 Start taking queue messages.
-
Install the Node.js client library.
npm install hazelcast-client
-
Add the following to your file:
const client = await Client.newHazelcastClient(); const queue = await client.getQueue('queue'); while (true) { const item = await queue.take(); console.log('Consumed item: ' + item); if (item === -1) { await queue.put(-1); break; } await new Promise((resolve) => { setTimeout(resolve, 5000); }); }
-
Install the Python client library.
pip install hazelcast-python-client
-
Add the following to your file:
import hazelcast import threading client = hazelcast.HazelcastClient() queue = client.get_queue("queue") (1) def consume(): consumed_count = 0 while consumed_count < 100: (2) head = queue.take().result() print("Consuming {}".format(head)) consumed_count += 1 consumer_thread = threading.Thread(target=consume) consumer_thread.start() consumer_thread.join() client.shutdown()
1 Access the queue. 2 Start taking queue messages.
-
Install the Go client library.
go get github.com/hazelcast/hazelcast-go-client
-
Add the following to your file:
package main import ( "context" "fmt" "time" "github.com/hazelcast/hazelcast-go-client" ) func main() { ctx := context.Background() hz, err := hazelcast.StartNewClient(ctx) if err != nil { panic(fmt.Errorf("starting hazelcast client %w", err)) } queue, err := hz.GetQueue(ctx, "queue") (1) if err != nil { panic(fmt.Errorf("getting queue %w", err)) } (2) for { item, err := queue.Take(ctx) if err != nil { panic(fmt.Errorf("taking item from the queue %w", err)) } fmt.Println("Consuming: ", item.(int64)) // notice that converting item to int64 is required due to serialization if item == int64(-1) { err := queue.Put(ctx, -1) if err != nil { panic(fmt.Errorf("putting item to queue %w", err)) } break } time.Sleep(5 * time.Second) } fmt.Println("Consumer Finished!") }
1 Access the queue. 2 Start taking queue messages.
As seen in the above example, the consumer waits five seconds before it takes the next message. It stops once it receives -1. Also note that the consumer puts -1 back on the queue before the loop is ended.
When you first start the producer and then start the consumer, items produced on the queue will be consumed from the same queue.
Hazelcast distributed queue uses item listeners to listen to the events that occur when items are added to and removed from the queue. See the Listening for Item Events section for information about how to create an item listener class and register it.
Balancing the Queue Operations
From the above example code, you can see that an item is produced every second and consumed every five seconds. Therefore, the consumer keeps growing. To balance the produce/consume operation, you can start another consumer by re-running the consumer code in a new file. This way, consumption is distributed to these two consumers, as seen in the example outputs below.
Once the second consumer is started, here is the first consumer output:
...
Consumed 13
Consumed 15
Consumed 17
...
Here is the second consumer output:
...
Consumed 14
Consumed 16
Consumed 18
...
In the case of a lot of producers and consumers for the queue, using a list of queues may solve the queue bottlenecks. In this case, be aware that the order of the messages sent to different queues is not guaranteed. Since in most cases strict ordering is not important, a list of queues is a good solution.
The items are taken from the queue in the same order they were put on the queue. However, if there is more than one consumer, this order is not guaranteed. |
Configuring Queue
The following are examples of queue configurations. It includes the
QueueStore
configuration, which is explained in the Queueing with Persistent Datastore section.
<hazelcast>
...
<queue name="default">
<max-size>0</max-size>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
<empty-queue-ttl>-1</empty-queue-ttl>
<item-listeners>
<item-listener>com.hazelcast.examples.ItemListener</item-listener>
</item-listeners>
<statistics-enabled>true</statistics-enabled>
<queue-store>
<class-name>com.hazelcast.QueueStoreImpl</class-name>
<properties>
<property name="binary">false</property>
<property name="memory-limit">10000</property>
<property name="bulk-load">500</property>
</properties>
</queue-store>
<split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
</queue>
...
</hazelcast>
hazelcast:
queue:
default:
max-size: 0
backup-count: 1
async-backup-count: 0
empty-queue-ttl: -1
item-listeners:
- include-value: true
class-name: com.hazelcast.examples.ItemListener
statistics-enabled: true
queue-store:
class-name: com.hazelcast.QueueStoreImpl
properties:
binary: false
memory-limit: 1000
bulk-load: 500
split-brain-protection-ref: splitbrainprotection-name
Config config = new Config();
QueueConfig queueConfig = config.getQueueConfig("default");
queueConfig.setName("MyQueue")
.setBackupCount(1)
.setMaxSize(0)
.setStatisticsEnabled(true)
.setSplitBrainProtectionName("splitbrainprotectionname");
queueConfig.getQueueStoreConfig()
.setEnabled(true)
.setClassName("com.hazelcast.QueueStoreImpl")
.setProperty("binary", "false");
config.addQueueConfig(queueConfig);
Hazelcast distributed queue has one synchronous backup by default.
By having this backup, when a cluster member with a queue goes down,
another member having the backup of that queue will continue. Therefore,
no items are lost. You can define the number of synchronous backups for a
queue using the backup-count
element in the declarative configuration.
A queue can also have asynchronous backups: you can define the number of
asynchronous backups using the async-backup-count
element.
To set the maximum size of the queue, use the max-size
element.
To purge unused or empty queues after a period of time, use the empty-queue-ttl
element.
If you define a value (time in seconds) for the empty-queue-ttl
element,
then your queue will be destroyed if it stays empty or unused for the time in seconds that you give.
The following is the full list of queue configuration elements with their descriptions:
-
max-size
: Maximum number of items in the queue. It is used to set an upper bound for the queue. You will not be able to put more items when the queue reaches to this maximum size whether you have a queue store configured or not. See Setting a Bounded Queue. -
backup-count
: Number of synchronous backups. Queue is a non-partitioned data structure, so all entries of a queue reside in one partition. When this parameter is '1', it means there will be one backup of that queue in another member in the cluster. When it is '2', two members will have the backup. -
async-backup-count
: Number of asynchronous backups. -
empty-queue-ttl
: Used to purge unused or empty queues. If you define a value (time in seconds) for this element, then your queue will be destroyed if it stays empty or unused for that time. -
item-listeners
: Adds listeners (listener classes) for the queue items. You can also set the attributeinclude-value
totrue
if you want the item event to contain the item values. You can setlocal
totrue
if you want to listen to the items on the local member. -
queue-store
: Includes the queue store factory class name and the properties binary, memory limit and bulk load. See Queueing with Persistent Datastore. -
statistics-enabled
: Specifies whether the statistics gathering is enabled for your queue. If set tofalse
, you cannot collect statistics in your implementation (usinggetLocalQueueStats()
) and also Hazelcast Management Center will not show them. Its default value istrue
. -
split-brain-protection-ref
: Name of the split-brain protection configuration that you want this queue to use. See Split-Brain Protection for Queue.
ItemIDs When Offering Items
Hazelcast gives an itemId
for each item you offer, which is an incrementing sequence
identification for the queue items. You should consider the following to understand the
itemId
assignment behavior:
-
When a Hazelcast member has a queue and that queue is configured to have at least one backup, and that member is restarted, the
itemId
assignment resumes from the last known highestitemId
before the restart;itemId
assignment does not start from the beginning for the new items. -
When the whole cluster is restarted, the same behavior explained in the above consideration applies if your queue has a persistent data store (
QueueStore
). If the queue hasQueueStore
, theitemId
for the new items are given, starting from the highestitemId
found in the IDs returned by the methodloadAllKeys
. If the methodloadAllKeys
does not return anything, theitemId
s starts from the beginning after a cluster restart. -
The above two considerations mean there are no duplicated
itemId
s in the memory or in the persistent data store.
Setting a Bounded Queue
A bounded queue is a queue with a limited capacity. When the bounded queue is full, no more items can be put into the queue until some items are taken out.
To turn a Hazelcast distributed queue into a bounded queue, set the capacity limit
with the max-size
property. You can set the max-size
property in the configuration,
as shown below. The max-size
element specifies the maximum size of the queue.
Once the queue size reaches this value, put
operations are blocked until the
queue size goes below max-size
, which happens when a consumer removes items from the queue.
The following is an example configuration where 10 is the maximum size of the queue:
<hazelcast>
...
<queue name="queue">
<max-size>10</max-size>
</queue>
...
</hazelcast>
hazelcast:
queue:
queue:
max-size: 10
When the producer is started, ten items are put into the queue and then the queue
will not allow more put
operations. When the consumer is started, it will remove
items from the queue. This means that the producer can put
more items into the
queue until there are ten items in the queue again, at which point the put
operation
again becomes blocked.
In the above producer and consumer example codes, the producer is five times faster than the consumer. It will effectively always be waiting for the consumer to remove items before it can put more on the queue. For this example code, if maximum throughput is the goal, it would be a good option to start multiple consumers to prevent the queue from filling up.
Queueing with Persistent Datastore
Hazelcast allows you to load and store the distributed queue items from/to a persistent
datastore using the interface QueueStore
. If queue store is enabled, each item added to
the queue is also stored at the configured queue store. When the number of items in the
queue exceeds the memory limit, the subsequent items are persisted in the queue store,
they are not stored in the queue memory.
A queue with a persistent datastore can be defined and configured only on the member side. Therefore, its implementation should be written in Java. All Hazelcast clients can access and operate on such a queue as if it is a regular one except that the queue is configured to have a persistent datastore.
The QueueStore
interface enables you to store, load and delete queue items with methods like
store
, storeAll
, load
and delete
. The following example class includes all the QueueStore
methods.
public class TheQueueStore implements QueueStore<Item> {
@Override
public void delete(Long key) {
System.out.println("delete");
}
@Override
public void store(Long key, Item value) {
System.out.println("store");
}
@Override
public void storeAll(Map<Long, Item> map) {
System.out.println("store all");
}
@Override
public void deleteAll(Collection<Long> keys) {
System.out.println("deleteAll");
}
@Override
(1)
public Item load(Long key) {
System.out.println("load");
return null;
}
@Override
public Map<Long, Item> loadAll(Collection<Long> keys) {
System.out.println("loadALl");
return null;
}
@Override
public Set<Long> loadAllKeys() {
System.out.println("loadAllKeys");
return null;
}
}
1 | Item must be serializable. |
The following is an example queue store configuration.
<hazelcast>
...
<queue name="queue">
<max-size>10</max-size>
<queue-store>
<class-name>com.hazelcast.TheQueueStore</class-name>
<properties>
<property name="binary">false</property>
<property name="memory-limit">1000</property>
<property name="bulk-load">500</property>
</properties>
</queue-store>
</queue>
...
</hazelcast>
hazelcast:
queue:
queue:
max-size: 10
queue-store:
class-name: com.hazelcast.TheQueueStore
properties:
binary: false
memory-limit: 1000
bulk-load: 500
The following are the descriptions for each queue store property:
-
Binary: By default, Hazelcast stores the queue items in serialized form, and before it inserts the queue items into the queue store, it deserializes them. If you are not reaching the queue store from an external application, you might prefer that the items be inserted in binary form. Do this by setting the
binary
property to true: then you can get rid of the deserialization step, which is a performance optimization. Thebinary
property is false by default. -
Memory Limit: This is the number of items after which Hazelcast stores items only to the datastore. For example, if the memory limit is 1000, then the 1001st item is put only to the datastore. This feature is useful when you want to avoid out-of-memory conditions. If you want to always use memory, you can set it to
Integer.MAX_VALUE
. The default number formemory-limit
is 1000. -
Bulk Load: When the queue is initialized, items are loaded from
QueueStore
in bulks. Bulk load is the size of these bulks. The default value ofbulk-load
is 250.
Split-Brain Protection for Queue
Queues can be configured to check for a minimum number of available members before applying queue operations (see the Split-Brain Protection section). This is a check to avoid performing successful queue operations on all parts of a cluster during a network partition.
The following is a list of methods, grouped by the protection types, that support split-brain protection checks:
-
WRITE, READ_WRITE
-
Collection.addAll()
-
Collection.removeAll()
,Collection.retainAll()
-
BlockingQueue.offer()
,BlockingQueue.add()
,BlockingQueue.put()
-
BlockingQueue.drainTo()
-
IQueue.poll()
,Queue.remove()
,IQueue.take()
-
BlockingQueue.remove()
-
-
READ, READ_WRITE
-
Collection.clear()
-
Collection.containsAll()
,BlockingQueue.contains()
-
Collection.isEmpty()
-
Collection.iterator()
,Collection.toArray()
-
Queue.peek()
,Queue.element()
-
Collection.size()
-
BlockingQueue.remainingCapacity()
-
The configuration is done on the member side and the following is an example.
<hazelcast>
...
<queue name="default">
<split-brain-protection-ref>splitBrainProtectionRuleWithFourMembers</split-brain-protection-ref>
</queue>
...
</hazelcast>
hazelcast:
queue:
default:
split-brain-protection-ref: splitBrainProtectionRuleWithFourMembers
SplitBrainProtectionConfig splitBrainProtectionConfig = new SplitBrainProtectionConfig();
splitBrainProtectionConfig.setName("splitBrainProtectionRuleWithFourMembers")
.setEnabled(true)
.setMinimumClusterSize(4);
QueueConfig queueConfig = new QueueConfig();
queueConfig.setSplitBrainProtectionName("splitBrainProtectionRuleWithFourMembers");
Config config = new Config();
config.addSplitBrainProtectionConfig(splitBrainProtectionConfig);
config.addQueueConfig(queueConfig);
The value of split-brain-protection-ref
should be the split-brain protection configuration name which you
configured under the split-brain-protection
element as explained in the Split-Brain Protection section.