Continuous Queries for Maps

When you find yourself running the same query on a map, you may want to consider creating a continuous query cache (CQC).

A CQC allows you to create a map that has a continuous query attached to it. Whenever a new entry is added to the map, the query runs and if the entry matches the query, it is added to the query cache. By using a CQC, you do not need to query a map for the same data all the time and all the query results are kept locally.

Queries must be written using the Predicate API. CQC does not support SQL queries.

You can create a CQC either from a Java client or on a member.

You cannot create a CQC from any client other than Java.

Accessing a Continuous Query Cache from a Member

The following code snippet shows how you can access a continuous query cache from a member.

        QueryCacheConfig queryCacheConfig = new QueryCacheConfig("cache-name");
        queryCacheConfig.getPredicateConfig().setImplementation(new OddKeysPredicate());

        MapConfig mapConfig = new MapConfig("map-name");
        mapConfig.addQueryCacheConfig(queryCacheConfig);

        Config config = new Config();
        config.addMapConfig(mapConfig);

        HazelcastInstance node = Hazelcast.newHazelcastInstance(config);
        IMap<Integer, String> map = (IMap) node.getMap("map-name");

Accessing a Continuous Query Cache from a Java Client

The following code snippet shows how you can access a continuous query cache from the client side. The difference in this code from the member side code is that you configure and instantiate a client instance instead of a member instance.

        QueryCacheConfig queryCacheConfig = new QueryCacheConfig("cache-name");
        queryCacheConfig.getPredicateConfig().setImplementation(new OddKeysPredicate());

        ClientConfig clientConfig = new ClientConfig();
        clientConfig.addQueryCacheConfig("map-name", queryCacheConfig);

        HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
        IMap<Integer, Integer> clientMap = (IMap) client.getMap("map-name");

        QueryCache<Integer, Integer> cache = clientMap.getQueryCache("cache-name");

Features of Continuous Query Cache

The following features of continuous query cache are valid for both the member and client:

  • The initial query that is run on the existing map entries during the continuous query cache construction can be enabled/disabled according to the supplied predicate via QueryCacheConfig.setPopulate().

  • Continuous query cache allows you to run queries with indexes and perform event batching and coalescing.

  • A continuous query cache is evictable. Note that a continuous query cache has a default maximum capacity of 10000. If you need a non-evictable cache, you should configure the eviction via QueryCacheConfig.setEvictionConfig().

  • A listener can be added to a continuous query cache using QueryCache.addEntryListener().

  • IMap events are reflected in continuous query cache in the same order as they were generated on map entries. Since events are created on entries stored in partitions, ordering of events is maintained based on the ordering within the partition. You can add listeners to capture lost events using EventLostListener and you can recover lost events with the method QueryCache.tryRecover(). Recovery of lost events largely depends on the size of the buffer on Hazelcast members. Default buffer size is 16 per partition, i.e., 16 events per partition can be maintained in the buffer. If the event generation is high, setting the buffer size to a higher number provides better chances of recovering lost events. You can set buffer size with QueryCacheConfig.setBufferSize(). You can use the following example code for a recovery case.

    QueryCache queryCache = map.getQueryCache("cache-name", Predicates.sql("this > 20"), true);
    queryCache.addEntryListener(new EventLostListener() {
    @Override
    public void eventLost(EventLostEvent event) {
           queryCache.tryRecover();
          }
    }, false);
  • You can populate a continuous query cache with only the keys of its entries and retrieve the subsequent values directly via QueryCache.get() from the underlying IMap. This helps to decrease the initial population time when the values are very large.

Configuring a Continuous Query Cache

You can configure a continuous query cache declaratively or programmatically; the latter is mostly explained in the previous section. The parent configuration element is <query-caches> which should be placed within your <map> configuration. You can create your query caches using the <query-cache> sub-element under <query-caches>.

The following is an example declarative configuration.

  • XML

  • YAML

<hazelcast>
    ...
    <map>
        <query-caches>
            <query-cache name="myContQueryCache">
                <include-value>true</include-value>
                <predicate type="class-name">com.hazelcast.examples.ExamplePredicate</predicate>
                <entry-listeners>
                    <entry-listener>...</entry-listener>
                </entry-listeners>
                <in-memory-format>BINARY</in-memory-format>
                <populate>true</populate>
                <coalesce>false</coalesce>
                <batch-size>2</batch-size>
                <delay-seconds>3</delay-seconds>
                <buffer-size>32</buffer-size>
                <eviction size="1000" max-size-policy="ENTRY_COUNT" eviction-policy="LFU"/>
                <indexes>
                    <index>
                        <attributes>
                            <attribute>age</attribute>
                        </attributes>
                    </index>
                </indexes>
            </query-cache>
        </query-caches>
    </map>
    ...
</hazelcast>
hazelcast:
  map:
    query-caches:
        myContQueryCache:
          include-value: true
          predicate:
            class-name: com.hazelcast.examples.ExamplePredicate
          entry-listeners:
            - class-name: "..."
          in-memory-format: BINARY
          populate: true
          coalesce: false
          batch-size: 2
          delay-seconds: 3
          buffer-size: 32
          eviction:
            size: 1000
            max-size-policy: ENTRY_COUNT
            eviction-policy: LFU
          indexes:
            - attributes:
              - "age"

Continuous query caches have the following configuration elements:

  • name: Name of your continuous query cache.

  • include-value: Specifies whether the value will be cached too. Its default value is true.

  • predicate: Predicate to filter events which are applied to the query cache.

  • entry-listeners: Adds listeners (listener classes) for your query cache entries. See the Registering Map Listeners section.

  • in-memory-format: Type of the data to be stored in your query cache. See the in-memory format section. Its default value is BINARY.

  • populate: Specifies whether the initial population of your query cache is enabled. Its default value is true.

  • coalesce: Specifies whether the coalescing of your query cache is enabled. Its default value is false.

  • delay-seconds: Minimum time in seconds that an event waits in the member’s buffer. Its default value is 0.

  • batch-size: Batch size used to determine the number of events sent in a batch to your query cache. Its default value is 1.

  • buffer-size: Maximum number of events which can be stored in a partition buffer. Its default value is 16.

  • eviction: Configuration for the eviction of your query cache. See the Configuring Map Eviction section.

  • indexes: Indexes for your query cache defined by using this element’s <index> sub-elements. See the Configuring IMap Indexes section.

Please take the following configuration considerations and publishing logic into account:

If delay-seconds is equal to or smaller than 0, then batch-size loses its function. Each time there is an event, all the entries in the buffer are pushed to the subscriber.

If delay-seconds is bigger than 0, the following logic applies:

  • If coalesce is set to true, the buffer is checked for an event with the same key; if so, it is overridden by the current event. Then:

    • The current size of the buffer is checked: if the current size of the buffer is equal to or larger than batch-size, then the events counted as much as the batch-size are pushed to the subscriber. Otherwise, no events are sent.

    • After finishing with checking batch-size, the delay-seconds is checked. The buffer is scanned from the oldest to youngest entries; all the entries that are older than delay-seconds are pushed to the subscriber.