A newer version of Platform is available.

View latest

Queue

Hazelcast distributed queue enables all cluster members and clients to interact with it; you can add an item in one cluster member and remove it from another member or a client.

Getting a Queue and Putting Items

Use the Hazelcast instance’s getQueue method to get the queue, then use the queue’s put method to put items into the queue.

        HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
        BlockingQueue<MyTask> queue = hazelcastInstance.getQueue( "tasks" );
        queue.put( new MyTask() );
        MyTask task = queue.take();

        boolean offered = queue.offer( new MyTask(), 10, TimeUnit.SECONDS );
        task = queue.poll( 5, TimeUnit.SECONDS );
        if ( task != null ) {
            //process task
        }

FIFO ordering applies to all queue operations across the cluster. The user objects (such as MyTask in the example above) that are enqueued or dequeued have to be Serializable.

Hazelcast distributed queue performs no batching while iterating over the queue. All items are copied locally and iteration occurs locally.

Hazelcast distributed queue uses ItemListener to listen to the events that occur when items are added to and removed from the queue. See the Listening for Item Events section for information about how to create an item listener class and register it.

Creating an Example Queue

The following example code illustrates a distributed queue that connects a producer and consumer.

Putting Items on the Queue

Let’s put one integer on the queue every second, 100 integers total.

public class ProducerMember {

    public static void main( String[] args ) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IQueue<Integer> queue = hz.getQueue( "queue" );
        for ( int k = 1; k < 100; k++ ) {
            queue.put( k );
            System.out.println( "Producing: " + k );
            Thread.sleep(1000);
        }
        queue.put( -1 );
        System.out.println( "Producer Finished!" );
    }
}

Producer puts a -1 on the queue to show that the puts are finished.

Taking Items off the Queue

Now, let’s create a Consumer class to take a message from this queue, as shown below.

public class ConsumerMember {

    public static void main( String[] args ) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IQueue<Integer> queue = hz.getQueue( "queue" );
        while ( true ) {
            int item = queue.take();
            System.out.println( "Consumed: " + item );
            if ( item == -1 ) {
                queue.put( -1 );
                break;
            }
            Thread.sleep( 5000 );
        }
        System.out.println( "Consumer Finished!" );
    }
}

As seen in the above example code, Consumer waits five seconds before it consumes the next message. It stops once it receives -1. Also note that Consumer puts -1 back on the queue before the loop is ended.

When you first start Producer and then start Consumer, items produced on the queue will be consumed from the same queue.

Balancing the Queue Operations

From the above example code, you can see that an item is produced every second and consumed every five seconds. Therefore, the consumer keeps growing. To balance the produce/consume operation, let’s start another consumer. This way, consumption is distributed to these two consumers, as seen in the example outputs below.

The second consumer is started. After a while, here is the first consumer output:

...
Consumed 13
Consumed 15
Consumer 17
...

Here is the second consumer output:

...
Consumed 14
Consumed 16
Consumer 18
...

In the case of a lot of producers and consumers for the queue, using a list of queues may solve the queue bottlenecks. In this case, be aware that the order of the messages sent to different queues is not guaranteed. Since in most cases strict ordering is not important, a list of queues is a good solution.

The items are taken from the queue in the same order they were put on the queue. However, if there is more than one consumer, this order is not guaranteed.

ItemIDs When Offering Items

Hazelcast gives an itemId for each item you offer, which is an incrementing sequence identification for the queue items. You should consider the following to understand the itemId assignment behavior:

  • When a Hazelcast member has a queue and that queue is configured to have at least one backup, and that member is restarted, the itemId assignment resumes from the last known highest itemId before the restart; itemId assignment does not start from the beginning for the new items.

  • When the whole cluster is restarted, the same behavior explained in the above consideration applies if your queue has a persistent data store (QueueStore). If the queue has QueueStore, the itemId for the new items are given, starting from the highest itemId found in the IDs returned by the method loadAllKeys. If the method loadAllKeys does not return anything, the itemIds starts from the beginning after a cluster restart.

  • The above two considerations mean there are no duplicated itemIds in the memory or in the persistent data store.

Setting a Bounded Queue

A bounded queue is a queue with a limited capacity. When the bounded queue is full, no more items can be put into the queue until some items are taken out.

To turn a Hazelcast distributed queue into a bounded queue, set the capacity limit with the max-size property. You can set the max-size property in the configuration, as shown below. The max-size element specifies the maximum size of the queue. Once the queue size reaches this value, put operations are blocked until the queue size goes below max-size, which happens when a consumer removes items from the queue.

Let’s set 10 as the maximum size of our example queue in Creating an Example Queue.

  • XML

  • YAML

<hazelcast>
    ...
    <queue name="queue">
        <max-size>10</max-size>
    </queue>
    ...
</hazelcast>
hazelcast:
  queue:
    queue:
      max-size: 10

When the producer is started, ten items are put into the queue and then the queue will not allow more put operations. When the consumer is started, it will remove items from the queue. This means that the producer can put more items into the queue until there are ten items in the queue again, at which point the put operation again becomes blocked.

In this example code, the producer is five times faster than the consumer. It will effectively always be waiting for the consumer to remove items before it can put more on the queue. For this example code, if maximum throughput is the goal, it would be a good option to start multiple consumers to prevent the queue from filling up.

Queueing with Persistent Datastore

Hazelcast allows you to load and store the distributed queue items from/to a persistent datastore using the interface QueueStore. If queue store is enabled, each item added to the queue is also stored at the configured queue store. When the number of items in the queue exceeds the memory limit, the subsequent items are persisted in the queue store, they are not stored in the queue memory.

The QueueStore interface enables you to store, load and delete queue items with methods like store, storeAll, load and delete. The following example class includes all the QueueStore methods.

public class TheQueueStore implements QueueStore<Item> {

    @Override
    public void delete(Long key) {
        System.out.println("delete");
    }

    @Override
    public void store(Long key, Item value) {
        System.out.println("store");
    }

    @Override
    public void storeAll(Map<Long, Item> map) {
        System.out.println("store all");
    }

    @Override
    public void deleteAll(Collection<Long> keys) {
        System.out.println("deleteAll");
    }

    @Override
    public Item load(Long key) {
        System.out.println("load");
        return null;
    }

    @Override
    public Map<Long, Item> loadAll(Collection<Long> keys) {
        System.out.println("loadALl");
        return null;
    }

    @Override
    public Set<Long> loadAllKeys() {
        System.out.println("loadAllKeys");
        return null;
    }
}

Item must be serializable. The following is an example queue store configuration.

  • XML

  • YAML

<hazelcast>
    ...
    <queue name="queue">
        <max-size>10</max-size>
        <queue-store>
            <class-name>com.hazelcast.QueueStoreImpl</class-name>
            <properties>
                <property name="binary">false</property>
                <property name="memory-limit">1000</property>
                <property name="bulk-load">500</property>
            </properties>
        </queue-store>
    </queue>
    ...
</hazelcast>
hazelcast:
  queue:
    queue:
      max-size: 10
      queue-store:
        class-name: com.hazelcast.QueueStoreImpl
        properties:
          binary: false
          memory-limit: 1000
          bulk-load: 500

The following are the descriptions for each queue store property:

  • Binary: By default, Hazelcast stores the queue items in serialized form, and before it inserts the queue items into the queue store, it deserializes them. If you are not reaching the queue store from an external application, you might prefer that the items be inserted in binary form. Do this by setting the binary property to true: then you can get rid of the deserialization step, which is a performance optimization. The binary property is false by default.

  • Memory Limit: This is the number of items after which Hazelcast stores items only to the datastore. For example, if the memory limit is 1000, then the 1001st item is put only to the datastore. This feature is useful when you want to avoid out-of-memory conditions. If you want to always use memory, you can set it to Integer.MAX_VALUE. The default number for memory-limit is 1000.

  • Bulk Load: When the queue is initialized, items are loaded from QueueStore in bulks. Bulk load is the size of these bulks. The default value of bulk-load is 250.

Split-Brain Protection for Queue

Queues can be configured to check for a minimum number of available members before applying queue operations (see the Split-Brain Protection section). This is a check to avoid performing successful queue operations on all parts of a cluster during a network partition.

The following is a list of methods, grouped by the protection types, that support split-brain protection checks:

  • WRITE, READ_WRITE

    • Collection.addAll()

    • Collection.removeAll(), Collection.retainAll()

    • BlockingQueue.offer(), BlockingQueue.add(), BlockingQueue.put()

    • BlockingQueue.drainTo()

    • IQueue.poll(), Queue.remove(), IQueue.take()

    • BlockingQueue.remove()

  • READ, READ_WRITE

    • Collection.clear()

    • Collection.containsAll(), BlockingQueue.contains()

    • Collection.isEmpty()

    • Collection.iterator(), Collection.toArray()

    • Queue.peek(), Queue.element()

    • Collection.size()

    • BlockingQueue.remainingCapacity()

Configuring Queue

The following are examples of queue configurations. It includes the QueueStore configuration, which is explained in the Queueing with Persistent Datastore section.

Declarative Configuration:

  • XML

  • YAML

<hazelcast>
    ...
    <queue name="default">
        <max-size>0</max-size>
        <backup-count>1</backup-count>
        <async-backup-count>0</async-backup-count>
        <empty-queue-ttl>-1</empty-queue-ttl>
        <item-listeners>
            <item-listener>com.hazelcast.examples.ItemListener</item-listener>
        </item-listeners>
        <statistics-enabled>true</statistics-enabled>
        <queue-store>
            <class-name>com.hazelcast.QueueStoreImpl</class-name>
            <properties>
                <property name="binary">false</property>
                <property name="memory-limit">10000</property>
                <property name="bulk-load">500</property>
            </properties>
        </queue-store>
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </queue>
    ...
</hazelcast>
hazelcast:
  queue:
    default:
      max-size: 0
      backup-count: 1
      async-backup-count: 0
      empty-queue-ttl: -1
      item-listeners:
        - include-value: true
          class-name: com.hazelcast.examples.ItemListener
      statistics-enabled: true
      queue-store:
        class-name: com.hazelcast.QueueStoreImpl
        properties:
          binary: false
          memory-limit: 1000
          bulk-load: 500
      split-brain-protection-ref: splitbrainprotection-name

Programmatic Configuration:

        Config config = new Config();
        QueueConfig queueConfig = config.getQueueConfig("default");
        queueConfig.setName("MyQueue")
                .setBackupCount(1)
                .setMaxSize(0)
                .setStatisticsEnabled(true)
                .setSplitBrainProtectionName("splitbrainprotectionname");
        queueConfig.getQueueStoreConfig()
                .setEnabled(true)
                .setClassName("com.hazelcast.QueueStoreImpl")
                .setProperty("binary", "false");
        config.addQueueConfig(queueConfig);

Hazelcast distributed queue has one synchronous backup by default. By having this backup, when a cluster member with a queue goes down, another member having the backup of that queue will continue. Therefore, no items are lost. You can define the number of synchronous backups for a queue using the backup-count element in the declarative configuration. A queue can also have asynchronous backups: you can define the number of asynchronous backups using the async-backup-count element.

To set the maximum size of the queue, use the max-size element. To purge unused or empty queues after a period of time, use the empty-queue-ttl element. If you define a value (time in seconds) for the empty-queue-ttl element, then your queue will be destroyed if it stays empty or unused for the time in seconds that you give.

The following is the full list of queue configuration elements with their descriptions:

  • max-size: Maximum number of items in the queue. It is used to set an upper bound for the queue. You will not be able to put more items when the queue reaches to this maximum size whether you have a queue store configured or not.

  • backup-count: Number of synchronous backups. Queue is a non-partitioned data structure, so all entries of a queue reside in one partition. When this parameter is '1', it means there will be one backup of that queue in another member in the cluster. When it is '2', two members will have the backup.

  • async-backup-count: Number of asynchronous backups.

  • empty-queue-ttl: Used to purge unused or empty queues. If you define a value (time in seconds) for this element, then your queue will be destroyed if it stays empty or unused for that time.

  • item-listeners: Adds listeners (listener classes) for the queue items. You can also set the attribute include-value to true if you want the item event to contain the item values. You can set local to true if you want to listen to the items on the local member.

  • queue-store: Includes the queue store factory class name and the properties binary, memory limit and bulk load. See the Queueing with Persistent Datastore section.

  • statistics-enabled: Specifies whether the statistics gathering is enabled for your queue. If set to false, you cannot collect statistics in your implementation (using getLocalQueueStats()) and also Hazelcast Management Center will not show them. Its default value is true.

  • split-brain-protection-ref : Name of the split-brain protection configuration that you want this queue to use.