Hazelcast distributed queue enables all cluster members and clients to interact with it; you can add an item in one cluster member and remove it from another member or a client.
The Hazelcast Queue provides standard FIFO queue semantics. Operations on the queue (including
those with a timeout) happen on a first-come, first-serve basis. The user objects
that are enqueued or dequeued have to be Serializable.
Hazelcast distributed queue performs no batching while iterating over the queue. All items are copied locally and iteration occurs locally.
Creating a Queue
When you start a Hazelcast member with default configuration, it will have an empty Queue named default.
See Start a Local Cluster in Docker for a quick cluster startup.
You can either use this default Queue or create your own using the Queue’s getter methods as shown in the below
examples. If you pass a name other than default as the Queue name in these methods, it creates a new Queue with
the given name; otherwise, it will use this existing Queue.
The following examples illustrate a distributed queue that connects a producer and consumer.
Putting Items on the Queue
The following creates a queue producer which adds one integer on the queue every second, 100 integers total.
- 
Add the following to your file: public class ProducerMember { public static void main( String[] args ) throws Exception { HazelcastInstance hz = Hazelcast.newHazelcastInstance(); IQueue<Integer> queue = hz.getQueue( "queue" ); (1) (2) for ( int k = 1; k < 100; k++ ) { queue.put( k ); System.out.println( "Producing: " + k ); Thread.sleep(1000); } queue.put( -1 ); System.out.println( "Producer Finished!" ); } }
- 
Add the following to your file: #include <hazelcast/client/hazelcast_client.h> int main() { auto hz = hazelcast::new_client().get(); auto queue = hz.get_queue("queue").get(); (1) (2) for (int k = 1; k < 100; k++) { queue->put(k).get(); std::cout << "Producing: " << k << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); } queue->put(-1).get(); std::cout << "Producer Finished!" << std::endl; std::cout << "Finished" << std::endl; return 0; }
- 
Add the following to your file: using System; using System.Threading.Tasks; namespace Hazelcast.Examples.DistributedObjects { public class QueueExample { public static async Task Main(string[] args) { var options = new HazelcastOptionsBuilder() .With(args) .WithConsoleLogger() .Build(); await using var client = await HazelcastClientFactory.StartNewClientAsync(options); await using var queue = await client.GetQueueAsync<string>("queue"); (1) (2) var producer = Task.Run(async () => { for (var i = 0; i < 100; i++) { await queue.OfferAsync("value " + i); } Console.WriteLine("produced"); }); } } }
- 
Install the Node.js client library. npm install hazelcast-client
- 
Add the following to your file: const client = await Client.newHazelcastClient(); const queue = await client.getQueue('queue'); for (let k = 0; k < 100; k++) { await queue.put(k); console.log('Producing: ' + k); await new Promise((resolve) => { setTimeout(resolve, 1000); }); } await queue.put(-1);
- 
Install the Python client library. pip install hazelcast-python-client
- 
Add the following to your file: import hazelcast import threading client = hazelcast.HazelcastClient() queue = client.get_queue("queue") (1) (2) def produce(): for i in range(100): queue.offer("value-" + str(i)) producer_thread = threading.Thread(target=produce) producer_thread.start() producer_thread.join() client.shutdown()
- 
Install the Go client library. go get github.com/hazelcast/hazelcast-go-client
- 
Add the following to your file: package main import ( "context" "fmt" "time" "github.com/hazelcast/hazelcast-go-client" ) func main() { ctx := context.Background() hz, err := hazelcast.StartNewClient(ctx) if err != nil { panic(fmt.Errorf("starting hazelcast client %w", err)) } queue, err := hz.GetQueue(ctx, "queue") (1) if err != nil { panic(fmt.Errorf("getting queue instance %w", err)) } (2) for i := 1; i < 100; i++ { err := queue.Put(ctx, int64(i)) fmt.Println("Producing: ", int64(i)) if err != nil { panic(fmt.Errorf("putting item to queue %w", err)) } time.Sleep(time.Second) } err = queue.Put(ctx, int64(-1)) if err != nil { panic(fmt.Errorf("putting item to queue %w", err)) } fmt.Println("Producer Finished!") }
| 1 | Create the Queue called queue. | 
| 2 | Add items to queue. | 
This producer puts a -1 on the queue to show that adding items operation is finished.
Taking Items off the Queue
The following creates a queue consumer to take messages from this queue.
- 
Add the following to your file: public class ConsumerMember { public static void main( String[] args ) throws Exception { HazelcastInstance hz = Hazelcast.newHazelcastInstance(); IQueue<Integer> queue = hz.getQueue( "queue" ); (1) while ( true ) { int item = queue.take(); (2) System.out.println( "Consumed: " + item ); if ( item == -1 ) { queue.put( -1 ); break; } Thread.sleep( 5000 ); (3) } System.out.println( "Consumer Finished!" ); } }1 Access the queue. 2 Start taking queue messages. 3 Wait for 5 seconds until the next message is taken. 
- 
Add the following to your file: #include <hazelcast/client/hazelcast_client.h> int main() { auto hz = hazelcast::new_client().get(); auto queue = hz.get_queue("queue").get(); (1) while (true) { auto item = queue->take<int32_t>().get(); (2) if (item) { std::cout << "Consumed: " << item.value() << std::endl; if (item.value() == -1) { queue->put(-1).get(); break; } } else { std::cout << "Retrieved item is null." << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(5)); (3) } std::cout << "Consumer Finished!" << std::endl; std::cout << "Finished" << std::endl; return 0; }1 Access the queue. 2 Start taking queue messages. 3 Wait for 5 seconds until the next message is taken. 
- 
Add the following to your file: using System; using System.Threading.Tasks; namespace Hazelcast.Examples.DistributedObjects { public class QueueExample { public static async Task Main(string[] args) { var options = new HazelcastOptionsBuilder() .With(args) .WithConsoleLogger() .Build(); await using var client = await HazelcastClientFactory.StartNewClientAsync(options); await using var queue = await client.GetQueueAsync<string>("queue"); (1) var consumer = Task.Run(async () => { var nConsumed = 0; string e; while (nConsumed++ < 100 && (e = await queue.PollAsync()) != null) (2) { Console.WriteLine("Consuming " + e); } Console.WriteLine("consumed"); }); await Task.WhenAll(producer, consumer); await client.DestroyAsync(queue); } } }1 Access the queue. 2 Start taking queue messages. 
- 
Install the Node.js client library. npm install hazelcast-client
- 
Add the following to your file: const client = await Client.newHazelcastClient(); const queue = await client.getQueue('queue'); while (true) { const item = await queue.take(); console.log('Consumed item: ' + item); if (item === -1) { await queue.put(-1); break; } await new Promise((resolve) => { setTimeout(resolve, 5000); }); }
- 
Install the Python client library. pip install hazelcast-python-client
- 
Add the following to your file: import hazelcast import threading client = hazelcast.HazelcastClient() queue = client.get_queue("queue") (1) def consume(): consumed_count = 0 while consumed_count < 100: (2) head = queue.take().result() print("Consuming {}".format(head)) consumed_count += 1 consumer_thread = threading.Thread(target=consume) consumer_thread.start() consumer_thread.join() client.shutdown()1 Access the queue. 2 Start taking queue messages. 
- 
Install the Go client library. go get github.com/hazelcast/hazelcast-go-client
- 
Add the following to your file: package main import ( "context" "fmt" "time" "github.com/hazelcast/hazelcast-go-client" ) func main() { ctx := context.Background() hz, err := hazelcast.StartNewClient(ctx) if err != nil { panic(fmt.Errorf("starting hazelcast client %w", err)) } queue, err := hz.GetQueue(ctx, "queue") (1) if err != nil { panic(fmt.Errorf("getting queue %w", err)) } (2) for { item, err := queue.Take(ctx) if err != nil { panic(fmt.Errorf("taking item from the queue %w", err)) } fmt.Println("Consuming: ", item.(int64)) // notice that converting item to int64 is required due to serialization if item == int64(-1) { err := queue.Put(ctx, -1) if err != nil { panic(fmt.Errorf("putting item to queue %w", err)) } break } time.Sleep(5 * time.Second) } fmt.Println("Consumer Finished!") }1 Access the queue. 2 Start taking queue messages. 
As seen in the above example, the consumer waits five seconds before it takes the next message. It stops once it receives -1. Also note that the consumer puts -1 back on the queue before the loop is ended.
When you first start the producer and then start the consumer, items produced on the queue will be consumed from the same queue.
Hazelcast distributed queue uses item listeners to listen to the events that occur when items are added to and removed from the queue. See the Listening for Item Events section for information about how to create an item listener class and register it.
Balancing the Queue Operations
From the above example code, you can see that an item is produced every second and consumed every five seconds. Therefore, the consumer keeps growing. To balance the produce/consume operation, you can start another consumer by re-running the consumer code in a new file. This way, consumption is distributed to these two consumers, as seen in the example outputs below.
Once the second consumer is started, here is the first consumer output:
...
Consumed 13
Consumed 15
Consumed 17
...Here is the second consumer output:
...
Consumed 14
Consumed 16
Consumed 18
...In the case of a lot of producers and consumers for the queue, using a list of queues may solve the queue bottlenecks. In this case, be aware that the order of the messages sent to different queues is not guaranteed. Since in most cases strict ordering is not important, a list of queues is a good solution.
| The items are taken from the queue in the same order they were put on the queue. However, if there is more than one consumer, this order is not guaranteed. | 
Configuring Queue
The following are examples of queue configurations. It includes the
QueueStore configuration, which is explained in the Queueing with Persistent Datastore section.
<hazelcast>
    ...
    <queue name="default">
        <max-size>0</max-size>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <empty-queue-ttl>-1</empty-queue-ttl>
        <item-listeners>
            <item-listener>com.hazelcast.examples.ItemListener</item-listener>
        </item-listeners>
        <statistics-enabled>true</statistics-enabled>
        <queue-store>
            <class-name>com.hazelcast.QueueStoreImpl</class-name>
            <properties>
                <property name="binary">false</property>
                <property name="memory-limit">10000</property>
                <property name="bulk-load">500</property>
            </properties>
        </queue-store>
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </queue>
    ...
</hazelcast>hazelcast:
  queue:
    default:
      max-size: 0
      backup-count: 1
      async-backup-count: 0
      empty-queue-ttl: -1
      item-listeners:
        - include-value: true
          class-name: com.hazelcast.examples.ItemListener
      statistics-enabled: true
      queue-store:
        class-name: com.hazelcast.QueueStoreImpl
        properties:
          binary: false
          memory-limit: 1000
          bulk-load: 500
      split-brain-protection-ref: splitbrainprotection-name        Config config = new Config();
        QueueConfig queueConfig = config.getQueueConfig("default");
        queueConfig.setName("MyQueue")
                .setBackupCount(1)
                .setMaxSize(0)
                .setStatisticsEnabled(true)
                .setSplitBrainProtectionName("splitbrainprotectionname");
        queueConfig.getQueueStoreConfig()
                .setEnabled(true)
                .setClassName("com.hazelcast.QueueStoreImpl")
                .setProperty("binary", "false");
        config.addQueueConfig(queueConfig);Hazelcast distributed queue has one synchronous backup by default.
By having this backup, when a cluster member with a queue goes down,
another member having the backup of that queue will continue. Therefore,
no items are lost. You can define the number of synchronous backups for a
queue using the backup-count element in the declarative configuration.
A queue can also have asynchronous backups: you can define the number of
asynchronous backups using the async-backup-count element.
To set the maximum size of the queue, use the max-size element.
To destroy empty queues after a period of time, use the empty-queue-ttl element.
The following is the full list of queue configuration elements with their descriptions:
- 
max-size: Maximum number of items in the queue. It is used to set an upper bound for the queue. You will not be able to put more items when the queue reaches to this maximum size whether you have a queue store configured or not. See Setting a Bounded Queue.
- 
backup-count: Number of synchronous backups. Queue is a non-partitioned data structure, so all entries of a queue reside in one partition. When this parameter is '1', it means there will be one backup of that queue in another member in the cluster. When it is '2', two members will have the backup.
- 
async-backup-count: Number of asynchronous backups.
- 
empty-queue-ttl: Used to destroy the queue once items have been removed and it has been empty empty for a given amount of time (seconds). Has no effect until the queue has been populated.
- 
item-listeners: Adds listeners (listener classes) for the queue items. You can also set the attributeinclude-valuetotrueif you want the item event to contain the item values. You can setlocaltotrueif you want to listen to the items on the local member.
- 
queue-store: Includes the queue store factory class name and the properties binary, memory limit and bulk load. See Queueing with Persistent Datastore.
- 
statistics-enabled: Specifies whether the statistics gathering is enabled for your queue. If set tofalse, you cannot collect statistics in your implementation (usinggetLocalQueueStats()) and also Hazelcast Management Center will not show them. Its default value istrue.
- 
split-brain-protection-ref: Name of the split-brain protection configuration that you want this queue to use. See Split-Brain Protection for Queue.
ItemIDs When Offering Items
Hazelcast gives an itemId for each item you offer, which is an incrementing sequence
identification for the queue items. You should consider the following to understand the
itemId assignment behavior:
- 
When a Hazelcast member has a queue and that queue is configured to have at least one backup, and that member is restarted, the itemIdassignment resumes from the last known highestitemIdbefore the restart;itemIdassignment does not start from the beginning for the new items.
- 
When the whole cluster is restarted, the same behavior explained in the above consideration applies if your queue has a persistent data store ( QueueStore). If the queue hasQueueStore, theitemIdfor the new items are given, starting from the highestitemIdfound in the IDs returned by the methodloadAllKeys. If the methodloadAllKeysdoes not return anything, theitemIds starts from the beginning after a cluster restart.
- 
The above two considerations mean there are no duplicated itemIds in the memory or in the persistent data store.
Setting a Bounded Queue
A bounded queue is a queue with a limited capacity. When the bounded queue is full, no more items can be put into the queue until some items are taken out.
To turn a Hazelcast distributed queue into a bounded queue, set the capacity limit
with the max-size property. You can set the max-size property in the configuration,
as shown below. The max-size element specifies the maximum size of the queue.
Once the queue size reaches this value, put operations are blocked until the
queue size goes below max-size, which happens when a consumer removes items from the queue.
The following is an example configuration where 10 is the maximum size of the queue:
<hazelcast>
    ...
    <queue name="queue">
        <max-size>10</max-size>
    </queue>
    ...
</hazelcast>hazelcast:
  queue:
    queue:
      max-size: 10When the producer is started, ten items are put into the queue and then the queue
will not allow more put operations. When the consumer is started, it will remove
items from the queue. This means that the producer can put more items into the
queue until there are ten items in the queue again, at which point the put operation
again becomes blocked.
In the above producer and consumer example codes, the producer is five times faster than the consumer. It will effectively always be waiting for the consumer to remove items before it can put more on the queue. For this example code, if maximum throughput is the goal, it would be a good option to start multiple consumers to prevent the queue from filling up.
Queueing with Persistent Datastore
Hazelcast allows you to load and store the distributed queue items from/to a persistent
datastore using the interface QueueStore. If queue store is enabled, each item added to
the queue is also stored at the configured queue store. When the number of items in the
queue exceeds the memory limit, the subsequent items are persisted in the queue store,
they are not stored in the queue memory.
A queue with a persistent datastore can be defined and configured only on the member side. Therefore, its implementation should be written in Java. All Hazelcast clients can access and operate on such a queue as if it is a regular one except that the queue is configured to have a persistent datastore.
The QueueStore interface enables you to store, load and delete queue items with methods like
store, storeAll, load and delete. The following example class includes all the QueueStore methods.
public class TheQueueStore implements QueueStore<Item> {
    @Override
    public void delete(Long key) {
        System.out.println("delete");
    }
    @Override
    public void store(Long key, Item value) {
        System.out.println("store");
    }
    @Override
    public void storeAll(Map<Long, Item> map) {
        System.out.println("store all");
    }
    @Override
    public void deleteAll(Collection<Long> keys) {
        System.out.println("deleteAll");
    }
    @Override
    (1)
    public Item load(Long key) {
        System.out.println("load");
        return null;
    }
    @Override
    public Map<Long, Item> loadAll(Collection<Long> keys) {
        System.out.println("loadALl");
        return null;
    }
    @Override
    public Set<Long> loadAllKeys() {
        System.out.println("loadAllKeys");
        return null;
    }
}| 1 | Itemmust be serializable. | 
The following is an example queue store configuration.
<hazelcast>
    ...
    <queue name="queue">
        <max-size>10</max-size>
        <queue-store>
            <class-name>com.hazelcast.TheQueueStore</class-name>
            <properties>
                <property name="binary">false</property>
                <property name="memory-limit">1000</property>
                <property name="bulk-load">500</property>
            </properties>
        </queue-store>
    </queue>
    ...
</hazelcast>hazelcast:
  queue:
    queue:
      max-size: 10
      queue-store:
        class-name: com.hazelcast.TheQueueStore
        properties:
          binary: false
          memory-limit: 1000
          bulk-load: 500The following are the descriptions for each queue store property:
- 
Binary: By default, Hazelcast stores the queue items in serialized form, and before it inserts the queue items into the queue store, it deserializes them. If you are not reaching the queue store from an external application, you might prefer that the items be inserted in binary form. Do this by setting the binaryproperty to true: then you can get rid of the deserialization step, which is a performance optimization. Thebinaryproperty is false by default.
- 
Memory Limit: This is the number of items after which Hazelcast stores items only to the datastore. For example, if the memory limit is 1000, then the 1001st item is put only to the datastore. This feature is useful when you want to avoid out-of-memory conditions. If you want to always use memory, you can set it to Integer.MAX_VALUE. The default number formemory-limitis 1000.
- 
Bulk Load: When the queue is initialized, items are loaded from QueueStorein bulks. Bulk load is the size of these bulks. The default value ofbulk-loadis 250.
Split-Brain Protection for Queue
Queues can be configured to check for a minimum number of available members before applying queue operations (see the Split-Brain Protection section). This is a check to avoid performing successful queue operations on all parts of a cluster during a network partition.
The following is a list of methods, grouped by the protection types, that support split-brain protection checks:
- 
WRITE, READ_WRITE - 
Collection.addAll()
- 
Collection.removeAll(),Collection.retainAll()
- 
BlockingQueue.offer(),BlockingQueue.add(),BlockingQueue.put()
- 
BlockingQueue.drainTo()
- 
IQueue.poll(),Queue.remove(),IQueue.take()
- 
BlockingQueue.remove()
 
- 
- 
READ, READ_WRITE - 
Collection.clear()
- 
Collection.containsAll(),BlockingQueue.contains()
- 
Collection.isEmpty()
- 
Collection.iterator(),Collection.toArray()
- 
Queue.peek(),Queue.element()
- 
Collection.size()
- 
BlockingQueue.remainingCapacity()
 
- 
The configuration is done on the member side and the following is an example.
<hazelcast>
    ...
    <queue name="default">
        <split-brain-protection-ref>splitBrainProtectionRuleWithFourMembers</split-brain-protection-ref>
    </queue>
    ...
</hazelcast>hazelcast:
  queue:
    default:
      split-brain-protection-ref: splitBrainProtectionRuleWithFourMembersSplitBrainProtectionConfig splitBrainProtectionConfig = new SplitBrainProtectionConfig();
splitBrainProtectionConfig.setName("splitBrainProtectionRuleWithFourMembers")
			 .setEnabled(true)
			 .setMinimumClusterSize(4);
QueueConfig queueConfig = new QueueConfig();
queueConfig.setSplitBrainProtectionName("splitBrainProtectionRuleWithFourMembers");
Config config = new Config();
config.addSplitBrainProtectionConfig(splitBrainProtectionConfig);
config.addQueueConfig(queueConfig);The value of split-brain-protection-ref should be the split-brain protection configuration name which you
configured under the split-brain-protection element as explained in the Split-Brain Protection section.