Queue

Hazelcast distributed queue enables all cluster members and clients to interact with it; you can add an item in one cluster member and remove it from another member or a client.

FIFO ordering applies to all queue operations across the cluster. The user objects that are enqueued or dequeued have to be Serializable.

Hazelcast distributed queue performs no batching while iterating over the queue. All items are copied locally and iteration occurs locally.

Creating a Queue

When you start a Hazelcast member with default configuration, it will have an empty Queue named default. See Start a Local Cluster in Docker for a quick cluster startup.

You can either use this default Queue or create your own using the Queue’s getter methods as shown in the below examples. If you pass a name other than default as the Queue name in these methods, it creates a new Queue with the given name; otherwise, it will use this existing Queue.

The following examples illustrate a distributed queue that connects a producer and consumer.

Putting Items on the Queue

The following creates a queue producer which adds one integer on the queue every second, 100 integers total.

  • Java

  • C++

  • C Sharp

  • Node.js

  • Python

  • Go

  1. Install the Java client library

  2. Add the following to your file:

    public class ProducerMember {
    
        public static void main( String[] args ) throws Exception {
            HazelcastInstance hz = Hazelcast.newHazelcastInstance();
            IQueue<Integer> queue = hz.getQueue( "queue" ); (1)
            
            (2)
            for ( int k = 1; k < 100; k++ ) {
                queue.put( k );
                System.out.println( "Producing: " + k );
                Thread.sleep(1000);
            }
            queue.put( -1 );
            System.out.println( "Producer Finished!" );
        }
    }
  1. Install the latest C++ client library

  2. Add the following to your file:

    #include <hazelcast/client/hazelcast_client.h>
    
    int main() {
        auto hz = hazelcast::new_client().get();
    
        auto queue = hz.get_queue("queue").get(); (1)
    
        (2)
        for (int k = 1; k < 100; k++) {
            queue->put(k).get();
            std::cout << "Producing: " << k << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
        queue->put(-1).get();
        std::cout << "Producer Finished!" << std::endl;
    
        std::cout << "Finished" << std::endl;
    
        return 0;
    }
  1. Install the latest C Sharp client library

  2. Add the following to your file:

    using System;
    using System.Threading.Tasks;
    
    namespace Hazelcast.Examples.DistributedObjects
    {
        public class QueueExample
        {
            public static async Task Main(string[] args)
            {
                var options = new HazelcastOptionsBuilder()
                    .With(args)
                    .WithConsoleLogger()
                    .Build();
    
                await using var client = await HazelcastClientFactory.StartNewClientAsync(options);
    
                await using var queue = await client.GetQueueAsync<string>("queue"); (1)
    
                (2)
                var producer = Task.Run(async () =>
                {
                    for (var i = 0; i < 100; i++)
                    {
                        await queue.OfferAsync("value " + i);
                    }
                    Console.WriteLine("produced");
                });
    
            }
        }
    }
  1. Install the Node.js client library.

    npm install hazelcast-client
  2. Add the following to your file:

    const client = await Client.newHazelcastClient();
    const queue = await client.getQueue('queue');
    
    for (let k = 0; k < 100; k++) {
        await queue.put(k);
        console.log('Producing: ' + k);
        await new Promise((resolve) => {
            setTimeout(resolve, 1000);
        });
    }
    await queue.put(-1);
  1. Install the Python client library.

    pip install hazelcast-python-client
  2. Add the following to your file:

    import hazelcast
    import threading
    
    client = hazelcast.HazelcastClient()
    
    queue = client.get_queue("queue") (1)
    
    (2)
    def produce():
        for i in range(100):
            queue.offer("value-" + str(i))
    
    
    producer_thread = threading.Thread(target=produce)
    
    producer_thread.start()
    
    producer_thread.join()
    
    client.shutdown()
  1. Install the Go client library.

    go get github.com/hazelcast/hazelcast-go-client
  2. Add the following to your file:

    package main
    
    import (
    	"context"
    	"fmt"
    	"time"
    
    	"github.com/hazelcast/hazelcast-go-client"
    )
    
    func main() {
    	ctx := context.Background()
    	hz, err := hazelcast.StartNewClient(ctx)
    	if err != nil {
    		panic(fmt.Errorf("starting hazelcast client %w", err))
    	}
    	queue, err := hz.GetQueue(ctx, "queue") (1)
    	if err != nil {
    		panic(fmt.Errorf("getting queue instance %w", err))
    	}
    
             (2)
    	for i := 1; i < 100; i++ {
    		err := queue.Put(ctx, int64(i))
    		fmt.Println("Producing: ", int64(i))
    		if err != nil {
    			panic(fmt.Errorf("putting item to queue %w", err))
    		}
    		time.Sleep(time.Second)
    	}
    	err = queue.Put(ctx, int64(-1))
    	if err != nil {
    		panic(fmt.Errorf("putting item to queue %w", err))
    	}
    	fmt.Println("Producer Finished!")
    }
1 Create the Queue called queue.
2 Add items to queue.

This producer puts a -1 on the queue to show that adding items operation is finished.

Taking Items off the Queue

The following creates a queue consumer to take messages from this queue.

  • Java

  • C++

  • C Sharp

  • Node.js

  • Python

  • Go

  1. Install the Java client library

  2. Add the following to your file:

    public class ConsumerMember {
    
        public static void main( String[] args ) throws Exception {
            HazelcastInstance hz = Hazelcast.newHazelcastInstance();
            IQueue<Integer> queue = hz.getQueue( "queue" ); (1)
            while ( true ) {
                int item = queue.take(); (2)
                System.out.println( "Consumed: " + item );
                if ( item == -1 ) {
                    queue.put( -1 );
                    break;
                }
                Thread.sleep( 5000 ); (3)
            }
            System.out.println( "Consumer Finished!" );
        }
    }
    1 Access the queue.
    2 Start taking queue messages.
    3 Wait for 5 seconds until the next message is taken.
  1. Install the latest C++ client library

  2. Add the following to your file:

    #include <hazelcast/client/hazelcast_client.h>
    
    int main() {
        auto hz = hazelcast::new_client().get();
    
        auto queue = hz.get_queue("queue").get(); (1)
    
        while (true) {
            auto item = queue->take<int32_t>().get(); (2)
            if (item) {
                std::cout << "Consumed: " << item.value() << std::endl;
    
                if (item.value() == -1) {
                    queue->put(-1).get();
                    break;
                }
            } else {
                std::cout << "Retrieved item is null." << std::endl;
            }
            std::this_thread::sleep_for(std::chrono::seconds(5)); (3)
        }
        std::cout << "Consumer Finished!" << std::endl;
    
        std::cout << "Finished" << std::endl;
    
        return 0;
    }
    1 Access the queue.
    2 Start taking queue messages.
    3 Wait for 5 seconds until the next message is taken.
  1. Install the latest C Sharp client library

  2. Add the following to your file:

    using System;
    using System.Threading.Tasks;
    
    namespace Hazelcast.Examples.DistributedObjects
    {
        public class QueueExample
        {
            public static async Task Main(string[] args)
            {
                var options = new HazelcastOptionsBuilder()
                    .With(args)
                    .WithConsoleLogger()
                    .Build();
    
                await using var client = await HazelcastClientFactory.StartNewClientAsync(options);
    
                await using var queue = await client.GetQueueAsync<string>("queue"); (1)
    
                var consumer = Task.Run(async () =>
                {
                    var nConsumed = 0;
                    string e;
                    while (nConsumed++ < 100 && (e = await queue.PollAsync()) != null) (2)
                    {
                        Console.WriteLine("Consuming " + e);
                    }
                    Console.WriteLine("consumed");
                });
    
                await Task.WhenAll(producer, consumer);
    
                await client.DestroyAsync(queue);
            }
        }
    }
    1 Access the queue.
    2 Start taking queue messages.
  1. Install the Node.js client library.

    npm install hazelcast-client
  2. Add the following to your file:

    const client = await Client.newHazelcastClient();
    const queue = await client.getQueue('queue');
    
    while (true) {
        const item = await queue.take();
        console.log('Consumed item: ' + item);
        if (item === -1) {
            await queue.put(-1);
            break;
        }
        await new Promise((resolve) => {
            setTimeout(resolve, 5000);
        });
    }
  1. Install the Python client library.

    pip install hazelcast-python-client
  2. Add the following to your file:

    import hazelcast
    import threading
    
    client = hazelcast.HazelcastClient()
    
    queue = client.get_queue("queue") (1)
    
    
    def consume():
        consumed_count = 0
        while consumed_count < 100: (2)
            head = queue.take().result()
            print("Consuming {}".format(head))
            consumed_count += 1
    
    
    consumer_thread = threading.Thread(target=consume)
    
    consumer_thread.start()
    
    consumer_thread.join()
    
    client.shutdown()
    1 Access the queue.
    2 Start taking queue messages.
  1. Install the Go client library.

    go get github.com/hazelcast/hazelcast-go-client
  2. Add the following to your file:

    package main
    
    import (
    	"context"
    	"fmt"
    	"time"
    
    	"github.com/hazelcast/hazelcast-go-client"
    )
    
    func main() {
    	ctx := context.Background()
    	hz, err := hazelcast.StartNewClient(ctx)
    	if err != nil {
    		panic(fmt.Errorf("starting hazelcast client %w", err))
    	}
    	queue, err := hz.GetQueue(ctx, "queue") (1)
    	if err != nil {
    		panic(fmt.Errorf("getting queue %w", err))
    	}
    
             (2)
    	for {
    		item, err := queue.Take(ctx)
    		if err != nil {
    			panic(fmt.Errorf("taking item from the queue %w", err))
    		}
    		fmt.Println("Consuming: ", item.(int64))
    		// notice that converting item to int64 is required due to serialization
    		if item == int64(-1) {
    			err := queue.Put(ctx, -1)
    			if err != nil {
    				panic(fmt.Errorf("putting item to queue %w", err))
    			}
    			break
    		}
    		time.Sleep(5 * time.Second)
    	}
    	fmt.Println("Consumer Finished!")
    }
    1 Access the queue.
    2 Start taking queue messages.

As seen in the above example, the consumer waits five seconds before it takes the next message. It stops once it receives -1. Also note that the consumer puts -1 back on the queue before the loop is ended.

When you first start the producer and then start the consumer, items produced on the queue will be consumed from the same queue.

Hazelcast distributed queue uses item listeners to listen to the events that occur when items are added to and removed from the queue. See the Listening for Item Events section for information about how to create an item listener class and register it.

Balancing the Queue Operations

From the above example code, you can see that an item is produced every second and consumed every five seconds. Therefore, the consumer keeps growing. To balance the produce/consume operation, you can start another consumer by re-running the consumer code in a new file. This way, consumption is distributed to these two consumers, as seen in the example outputs below.

Once the second consumer is started, here is the first consumer output:

...
Consumed 13
Consumed 15
Consumed 17
...

Here is the second consumer output:

...
Consumed 14
Consumed 16
Consumed 18
...

In the case of a lot of producers and consumers for the queue, using a list of queues may solve the queue bottlenecks. In this case, be aware that the order of the messages sent to different queues is not guaranteed. Since in most cases strict ordering is not important, a list of queues is a good solution.

The items are taken from the queue in the same order they were put on the queue. However, if there is more than one consumer, this order is not guaranteed.

Configuring Queue

The following are examples of queue configurations. It includes the QueueStore configuration, which is explained in the Queueing with Persistent Datastore section.

  • XML

  • YAML

  • Java

<hazelcast>
    ...
    <queue name="default">
        <max-size>0</max-size>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <empty-queue-ttl>-1</empty-queue-ttl>
        <item-listeners>
            <item-listener>com.hazelcast.examples.ItemListener</item-listener>
        </item-listeners>
        <statistics-enabled>true</statistics-enabled>
        <queue-store>
            <class-name>com.hazelcast.QueueStoreImpl</class-name>
            <properties>
                <property name="binary">false</property>
                <property name="memory-limit">10000</property>
                <property name="bulk-load">500</property>
            </properties>
        </queue-store>
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </queue>
    ...
</hazelcast>
hazelcast:
  queue:
    default:
      max-size: 0
      backup-count: 1
      async-backup-count: 0
      empty-queue-ttl: -1
      item-listeners:
        - include-value: true
          class-name: com.hazelcast.examples.ItemListener
      statistics-enabled: true
      queue-store:
        class-name: com.hazelcast.QueueStoreImpl
        properties:
          binary: false
          memory-limit: 1000
          bulk-load: 500
      split-brain-protection-ref: splitbrainprotection-name
        Config config = new Config();
        QueueConfig queueConfig = config.getQueueConfig("default");
        queueConfig.setName("MyQueue")
                .setBackupCount(1)
                .setMaxSize(0)
                .setStatisticsEnabled(true)
                .setSplitBrainProtectionName("splitbrainprotectionname");
        queueConfig.getQueueStoreConfig()
                .setEnabled(true)
                .setClassName("com.hazelcast.QueueStoreImpl")
                .setProperty("binary", "false");
        config.addQueueConfig(queueConfig);

Hazelcast distributed queue has one synchronous backup by default. By having this backup, when a cluster member with a queue goes down, another member having the backup of that queue will continue. Therefore, no items are lost. You can define the number of synchronous backups for a queue using the backup-count element in the declarative configuration. A queue can also have asynchronous backups: you can define the number of asynchronous backups using the async-backup-count element.

To set the maximum size of the queue, use the max-size element. To purge unused or empty queues after a period of time, use the empty-queue-ttl element. If you define a value (time in seconds) for the empty-queue-ttl element, then your queue will be destroyed if it stays empty or unused for the time in seconds that you give.

The following is the full list of queue configuration elements with their descriptions:

  • max-size: Maximum number of items in the queue. It is used to set an upper bound for the queue. You will not be able to put more items when the queue reaches to this maximum size whether you have a queue store configured or not. See Setting a Bounded Queue.

  • backup-count: Number of synchronous backups. Queue is a non-partitioned data structure, so all entries of a queue reside in one partition. When this parameter is '1', it means there will be one backup of that queue in another member in the cluster. When it is '2', two members will have the backup.

  • async-backup-count: Number of asynchronous backups.

  • empty-queue-ttl: Used to purge unused or empty queues. If you define a value (time in seconds) for this element, then your queue will be destroyed if it stays empty or unused for that time.

  • item-listeners: Adds listeners (listener classes) for the queue items. You can also set the attribute include-value to true if you want the item event to contain the item values. You can set local to true if you want to listen to the items on the local member.

  • queue-store: Includes the queue store factory class name and the properties binary, memory limit and bulk load. See Queueing with Persistent Datastore.

  • statistics-enabled: Specifies whether the statistics gathering is enabled for your queue. If set to false, you cannot collect statistics in your implementation (using getLocalQueueStats()) and also Hazelcast Management Center will not show them. Its default value is true.

  • split-brain-protection-ref : Name of the split-brain protection configuration that you want this queue to use. See Split-Brain Protection for Queue.

ItemIDs When Offering Items

Hazelcast gives an itemId for each item you offer, which is an incrementing sequence identification for the queue items. You should consider the following to understand the itemId assignment behavior:

  • When a Hazelcast member has a queue and that queue is configured to have at least one backup, and that member is restarted, the itemId assignment resumes from the last known highest itemId before the restart; itemId assignment does not start from the beginning for the new items.

  • When the whole cluster is restarted, the same behavior explained in the above consideration applies if your queue has a persistent data store (QueueStore). If the queue has QueueStore, the itemId for the new items are given, starting from the highest itemId found in the IDs returned by the method loadAllKeys. If the method loadAllKeys does not return anything, the itemIds starts from the beginning after a cluster restart.

  • The above two considerations mean there are no duplicated itemIds in the memory or in the persistent data store.

Setting a Bounded Queue

A bounded queue is a queue with a limited capacity. When the bounded queue is full, no more items can be put into the queue until some items are taken out.

To turn a Hazelcast distributed queue into a bounded queue, set the capacity limit with the max-size property. You can set the max-size property in the configuration, as shown below. The max-size element specifies the maximum size of the queue. Once the queue size reaches this value, put operations are blocked until the queue size goes below max-size, which happens when a consumer removes items from the queue.

The following is an example configuration where 10 is the maximum size of the queue:

  • XML

  • YAML

<hazelcast>
    ...
    <queue name="queue">
        <max-size>10</max-size>
    </queue>
    ...
</hazelcast>
hazelcast:
  queue:
    queue:
      max-size: 10

When the producer is started, ten items are put into the queue and then the queue will not allow more put operations. When the consumer is started, it will remove items from the queue. This means that the producer can put more items into the queue until there are ten items in the queue again, at which point the put operation again becomes blocked.

In the above producer and consumer example codes, the producer is five times faster than the consumer. It will effectively always be waiting for the consumer to remove items before it can put more on the queue. For this example code, if maximum throughput is the goal, it would be a good option to start multiple consumers to prevent the queue from filling up.

Queueing with Persistent Datastore

Hazelcast allows you to load and store the distributed queue items from/to a persistent datastore using the interface QueueStore. If queue store is enabled, each item added to the queue is also stored at the configured queue store. When the number of items in the queue exceeds the memory limit, the subsequent items are persisted in the queue store, they are not stored in the queue memory.

A queue with a persistent datastore can be defined and configured only on the member side. Therefore, its implementation should be written in Java. All Hazelcast clients can access and operate on such a queue as if it is a regular one except that the queue is configured to have a persistent datastore.

The QueueStore interface enables you to store, load and delete queue items with methods like store, storeAll, load and delete. The following example class includes all the QueueStore methods.

public class TheQueueStore implements QueueStore<Item> {

    @Override
    public void delete(Long key) {
        System.out.println("delete");
    }

    @Override
    public void store(Long key, Item value) {
        System.out.println("store");
    }

    @Override
    public void storeAll(Map<Long, Item> map) {
        System.out.println("store all");
    }

    @Override
    public void deleteAll(Collection<Long> keys) {
        System.out.println("deleteAll");
    }

    @Override
    (1)
    public Item load(Long key) {
        System.out.println("load");
        return null;
    }

    @Override
    public Map<Long, Item> loadAll(Collection<Long> keys) {
        System.out.println("loadALl");
        return null;
    }

    @Override
    public Set<Long> loadAllKeys() {
        System.out.println("loadAllKeys");
        return null;
    }
}
1 Item must be serializable.

The following is an example queue store configuration.

  • XML

  • YAML

<hazelcast>
    ...
    <queue name="queue">
        <max-size>10</max-size>
        <queue-store>
            <class-name>com.hazelcast.TheQueueStore</class-name>
            <properties>
                <property name="binary">false</property>
                <property name="memory-limit">1000</property>
                <property name="bulk-load">500</property>
            </properties>
        </queue-store>
    </queue>
    ...
</hazelcast>
hazelcast:
  queue:
    queue:
      max-size: 10
      queue-store:
        class-name: com.hazelcast.TheQueueStore
        properties:
          binary: false
          memory-limit: 1000
          bulk-load: 500

The following are the descriptions for each queue store property:

  • Binary: By default, Hazelcast stores the queue items in serialized form, and before it inserts the queue items into the queue store, it deserializes them. If you are not reaching the queue store from an external application, you might prefer that the items be inserted in binary form. Do this by setting the binary property to true: then you can get rid of the deserialization step, which is a performance optimization. The binary property is false by default.

  • Memory Limit: This is the number of items after which Hazelcast stores items only to the datastore. For example, if the memory limit is 1000, then the 1001st item is put only to the datastore. This feature is useful when you want to avoid out-of-memory conditions. If you want to always use memory, you can set it to Integer.MAX_VALUE. The default number for memory-limit is 1000.

  • Bulk Load: When the queue is initialized, items are loaded from QueueStore in bulks. Bulk load is the size of these bulks. The default value of bulk-load is 250.

Split-Brain Protection for Queue

Queues can be configured to check for a minimum number of available members before applying queue operations (see the Split-Brain Protection section). This is a check to avoid performing successful queue operations on all parts of a cluster during a network partition.

The following is a list of methods, grouped by the protection types, that support split-brain protection checks:

  • WRITE, READ_WRITE

    • Collection.addAll()

    • Collection.removeAll(), Collection.retainAll()

    • BlockingQueue.offer(), BlockingQueue.add(), BlockingQueue.put()

    • BlockingQueue.drainTo()

    • IQueue.poll(), Queue.remove(), IQueue.take()

    • BlockingQueue.remove()

  • READ, READ_WRITE

    • Collection.clear()

    • Collection.containsAll(), BlockingQueue.contains()

    • Collection.isEmpty()

    • Collection.iterator(), Collection.toArray()

    • Queue.peek(), Queue.element()

    • Collection.size()

    • BlockingQueue.remainingCapacity()

The configuration is done on the member side and the following is an example.

  • XML

  • YAML

  • Java

<hazelcast>
    ...
    <queue name="default">
        <split-brain-protection-ref>splitBrainProtectionRuleWithFourMembers</split-brain-protection-ref>
    </queue>
    ...
</hazelcast>
hazelcast:
  queue:
    default:
      split-brain-protection-ref: splitBrainProtectionRuleWithFourMembers
SplitBrainProtectionConfig splitBrainProtectionConfig = new SplitBrainProtectionConfig();
splitBrainProtectionConfig.setName("splitBrainProtectionRuleWithFourMembers")
			 .setEnabled(true)
			 .setMinimumClusterSize(4);

QueueConfig queueConfig = new QueueConfig();
queueConfig.setSplitBrainProtectionName("splitBrainProtectionRuleWithFourMembers");

Config config = new Config();
config.addSplitBrainProtectionConfig(splitBrainProtectionConfig);
config.addQueueConfig(queueConfig);

The value of split-brain-protection-ref should be the split-brain protection configuration name which you configured under the split-brain-protection element as explained in the Split-Brain Protection section.