A newer version of Platform is available.

View latest

Predicates API

The Predicates API is a programming interface for querying data in Hazelcast, which is similar to the Java Persistence Query Language (JPQL).

In addition to the Java-based Predicates API, Hazelcast also supports SQL statements and functions, and streaming SQL queries.

How the Predicates API Works

The predicate is sent to each member in the cluster. Each member looks at its own local entries and filters them according to the predicate. At this stage, key/value pairs of the entries are deserialized and then passed to the predicate. The predicate requester merges all the results coming from each member into a single set.

Distributed query is highly scalable. If you add new members to the cluster, the partition count for each member is reduced and thus the time spent by each member on iterating its entries is reduced. In addition, the pool of partition threads evaluates the entries concurrently in each member and the network traffic is also reduced since only filtered data is sent to the requester.

Querying Maps with the Predicate API

The Predicates API offers built-in methods for creating queries as well as an sql() method for SQL-like queries.

Assume that you have a map called employee that contains values of Employee objects:

public class Employee implements Serializable {
    private String name;
    private int age;
    private boolean active;
    private double salary;

    public Employee(String name, int age, boolean active, double salary) {
        this.name = name;
        this.age = age;
        this.active = active;
        this.salary = salary;
    }

    public Employee() {
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    public double getSalary() {
        return salary;
    }

    public boolean isActive() {
        return active;
    }
}

Now, look for employees who are active and have an age less than 30.

When using Portable objects, if one field of an object exists on one member but does not exist on another one, Hazelcast does not throw an unknown field exception. Instead, Hazelcast treats that predicate, which tries to perform a query on an unknown field, as an always false predicate.
IMap<String, Employee> map = hazelcastInstance.getMap( "employee" );

EntryObject e = Predicates.newPredicateBuilder().getEntryObject();
Predicate predicate = e.is( "active" ).and( e.get( "age" ).lessThan( 30 ) );

Collection<Employee> employees = map.values( predicate );

In the above example code, predicate verifies whether the entry is active and its age value is less than 30. This predicate is applied to the employee map using the map.values(predicate) method. This method sends the predicate to all cluster members and merges the results coming from them. Since the predicate is communicated between the members, it needs to be serializable.

Predicates can also be applied to keySet(), entrySet() and localKeySet() methods of distributed maps.

Predicates Class Operators

The Predicates class includes many operators for your query requirements. The following are descriptions for some of them:

  • equal: Checks if the result of an expression is equal to a given value.

  • notEqual: Checks if the result of an expression is not equal to a given value.

  • instanceOf: Checks if the result of an expression has a certain type.

  • like: Checks if the result of an expression matches some string pattern. % (percentage sign) is the placeholder for many characters, (underscore) is placeholder for only one character.

  • ilike: A case-insensitive variant of like.

  • greaterThan: Checks if the result of an expression is greater than a certain value.

  • greaterEqual: Checks if the result of an expression is greater than or equal to a certain value.

  • lessThan: Checks if the result of an expression is less than a certain value.

  • lessEqual: Checks if the result of an expression is less than or equal to a certain value.

  • between: Checks if the result of an expression is between two values (this is inclusive).

  • in: Checks if the result of an expression is an element of a certain collection.

  • isNot: Checks if the result of an expression is false.

  • regex: Checks if the result of an expression matches some regular expression.

  • alwaysTrue: The result of an expression always matches.

  • alwaysFalse: The result of an expression ever matches.

See the Predicates Javadoc for all predicates provided.

Combining Predicates with AND, OR, NOT

You can combine predicates using the and, or and not operators, as shown in the below examples.

public Collection<Employee> getWithNameAndAge( String name, int age ) {
    Predicate namePredicate = Predicates.equal( "name", name );
    Predicate agePredicate = Predicates.equal( "age", age );
    Predicate predicate = Predicates.and( namePredicate, agePredicate );
    return employeeMap.values( predicate );
}
public Collection<Employee> getWithNameOrAge( String name, int age ) {
    Predicate namePredicate = Predicates.equal( "name", name );
    Predicate agePredicate = Predicates.equal( "age", age );
    Predicate predicate = Predicates.or( namePredicate, agePredicate );
    return employeeMap.values( predicate );
}
public Collection<Employee> getNotWithName( String name ) {
    Predicate namePredicate = Predicates.equal( "name", name );
    Predicate predicate = Predicates.not( namePredicate );
    return employeeMap.values( predicate );
}

Simplifying with PredicateBuilder

You can simplify predicate usage with the PredicateBuilder interface, which offers simpler predicate building. See the below example code which selects all people with a certain name and age.

public Collection<Employee> getWithNameAndAgeSimplified( String name, int age ) {
    EntryObject e = Predicates.newPredicateBuilder().getEntryObject();
    Predicate agePredicate = e.get( "age" ).equal( age );
    Predicate predicate = e.get( "name" ).equal( name ).and( agePredicate );
    return employeeMap.values( predicate );
}

Querying with SQL-like Predicates

Predicates.sql() takes the regular SQL where clause. Here is an example:

IMap<String, Employee> map = hazelcastInstance.getMap( "employee" );
Set<Employee> employees = map.values( Predicates.sql( "active AND age < 30" ) );
Hazelcast offers a SQL service that allows you to execute SQL queries, as opposed to SQL-like predicates in case of Predicates.sql(). See SQL for more information.

SQL-like predicates support the following syntax:

AND/OR: <expression> AND <expression> AND <expression>…​

  • active AND age>30

  • active=false OR age = 45 OR name = 'Joe'

  • active AND ( age > 20 OR salary < 60000 )

Equality: =, !=, <, ⇐, >, >=

  • <expression> = value

  • age ⇐ 30

  • name = 'Joe'

  • salary != 50000

BETWEEN: <attribute> [NOT] BETWEEN <value1> AND <value2>

  • age BETWEEN 20 AND 33 ( same as age >= 20 AND age ⇐ 33 )

  • age NOT BETWEEN 30 AND 40 ( same as age < 30 OR age > 40 )

IN: <attribute> [NOT] IN (val1, val2,…​)

  • age IN ( 20, 30, 40 )

  • age NOT IN ( 60, 70 )

  • active AND ( salary >= 50000 OR ( age NOT BETWEEN 20 AND 30 ) )

  • age IN ( 20, 30, 40 ) AND salary BETWEEN ( 50000, 80000 )

LIKE: <attribute> [NOT] LIKE "expression"

The % (percentage sign) is placeholder for multiple characters, an _ (underscore) is placeholder for only one character.

  • name LIKE 'Jo%' (true for 'Joe', 'Josh', 'Joseph' etc.)

  • name LIKE 'Jo_' (true for 'Joe'; false for 'Josh')

  • name NOT LIKE 'Jo_' (true for 'Josh'; false for 'Joe')

  • name LIKE 'J_s%' (true for 'Josh', 'Joseph'; false 'John', 'Joe')

ILIKE: <attribute> [NOT] ILIKE 'expression'

Similar to LIKE predicate but in a case-insensitive manner.

  • name ILIKE 'Jo%' (true for 'Joe', 'joe', 'jOe','Josh','joSH', etc.)

  • name ILIKE 'Jo_' (true for 'Joe' or 'jOE'; false for 'Josh')

REGEX: <attribute> [NOT] REGEX 'expression'

  • name REGEX 'abc-.*' (true for 'abc-123'; false for 'abx-123')

You can escape the % and _ placeholder characters in your SQL queries with predicates using the backslash (\) character. The apostrophe (') can be escaped with another apostrophe, i.e., ''. If you use REGEX, you need to escape characters according to the normal Java escape syntax; see the Oracle API docs for details.

Querying Entry Keys with Predicates

You can use __key attribute to perform a predicated search for entry keys. See the following example:

IMap<String, Person> personMap = hazelcastInstance.getMap(persons);
personMap.put("Alice", new Person("Alice", 35, Gender.FEMALE));
personMap.put("Andy",  new Person("Andy",  37, Gender.MALE));
personMap.put("Bob",   new Person("Bob",   22, Gender.MALE));
[...]
Predicate predicate = Predicates.sql("__key like A%");
Collection<Person> startingWithA = personMap.values(predicate);

In this example, the code creates a collection with the entries whose keys start with the letter "A”.

Querying JSON Strings

You can query JSON strings stored inside your Hazelcast clusters. To query a JSON string, you first need to create a HazelcastJsonValue from the JSON string. You can use HazelcastJsonValue objects both as keys and values in distributed data structures. Then, it is possible to query these objects using the Hazelcast query methods explained in this section.

String person1 = "{ \"name\": \"John\", \"age\": 35 }";
String person2 = "{ \"name\": \"Jane\", \"age\": 24 }";
String person3 = "{ \"name\": \"Trey\", \"age\": 17 }";

IMap<Integer, HazelcastJsonValue> idPersonMap = instance.getMap("jsonValues");

idPersonMap.put(1, new HazelcastJsonValue(person1));
idPersonMap.put(2, new HazelcastJsonValue(person2));
idPersonMap.put(3, new HazelcastJsonValue(person3));

Collection<HazelcastJsonValue> peopleUnder21 = idPersonMap.values(Predicates.lessThan("age", 21));

When running the queries, Hazelcast treats values extracted from the JSON documents as Java types so they can be compared with the query attribute. JSON specification defines five primitive types to be used in the JSON documents: number,string, true, false and null. The string, true/false and null types are treated as String, boolean and null, respectively. We treat the extracted number values as long where possible. Otherwise, number types are treated as double.

It is possible to query nested attributes and arrays in JSON documents, using the Predicates API. The query syntax is the same as querying collections and arrays in other Hazelcast objects.

/**
 * Sample JSON object
 *
 * {
 *     "departmentId": 1,
 *     "room": "alpha",
 *     "people": [
 *         {
 *             "name": "Peter",
 *             "age": 26,
 *             "salary": 50000
 *         },
 *         {
 *             "name": "Jonah",
 *             "age": 50,
 *             "salary": 140000
 *         }
 *     ]
 * }
 *
 *
 * The following query finds all the departments that have a person named "Peter" working in them.
 */
Collection<HazelcastJsonValue> departmentWithPeter = departments.values(Predicates.equal("people[any].name", "Peter"));

Filtering with Paging Predicates

Hazelcast provides paging for defined predicates. With its PagingPredicate interface, you can get a collection of keys, values, or entries page by page by filtering them with predicates and giving the size of the pages. Also, you can sort the entries by specifying comparators. In this case, the comparator should be Serializable and the serialization factory implementations you use, e.g., PortableFactory and DataSerializableFactory, should be registered. See the Serialization chapter on how to register these factories.

Paging predicates require the objects to be deserialized both on the calling side (either a member or client) and the member side from which the collection is retrieved. Therefore, you need to register the serialization factories you use on all the members and clients on which the paging predicates are used. See the Serialization chapter on how to register these factories.

In the example code below:

  • The greaterEqual predicate gets values from the "students" map. This predicate has a filter to retrieve the objects with an "age" greater than or equal to 18.

  • Then a PagingPredicate is constructed in which the page size is 5, so that there are five objects in each page. The first time the values() method is called, the first page is fetched.

  • Finally, the subsequent page is fetched by calling the nextPage() method of PagingPredicate and querying the map again with the updated PagingPredicate.

IMap<Integer, Student> map = hazelcastInstance.getMap( "students" );
Predicate greaterEqual = Predicates.greaterEqual( "age", 18 );
PagingPredicate pagingPredicate = Predicates.pagingPredicate( greaterEqual, 5 );
// Retrieve the first page
Collection<Student> values = map.values( pagingPredicate );
...
// Set up next page
pagingPredicate.nextPage();
// Retrieve next page
values = map.values( pagingPredicate );
...

If a comparator is not specified for PagingPredicate, but you want to get a collection of keys or values page by page, keys or values must be instances of Comparable (i.e., they must implement java.lang.Comparable). Otherwise, the java.lang.IllegalArgument exception is thrown.

You can also access a specific page more easily with the help of the setPage() method. This way, if you make a query for the hundredth page, for example, it gets all 100 pages at once instead of reaching the hundredth page one by one using the nextPage() method. Note that this feature tires the memory and see the PagingPredicate Javadoc.

Paging Predicate, also known as Order & Limit, is not supported in Transactional Context.

Filtering with Partition Predicate

You can run queries on a single partition in your cluster using the partition predicate (PartitionPredicate).

The Predicates.partitionPredicate() method takes a predicate and partition key as parameters, gets the partition ID using the key and runs that predicate only on the partition where that key belongs.

See the following code snippet:

...
Predicate predicate = Predicates.partitionPredicate(partitionKey, Predicates.alwaysTrue());

Collection<Integer> values = map.values(predicate);
Collection<String> keys = map.keySet(predicate);
...

By default there are 271 partitions, and using a regular predicate, each partition needs to be accessed. However, if the partition predicate only accesses a single partition, this can lead to a big performance gain.

For the partition predicate to work correctly, you need to know which partition your data belongs to so that you can send the request to the correct partition. One of the ways of doing it is to make use of the PartitionAware interface when data is inserted, thereby controlling the owning partition. See the PartitionAware section for more information and examples.

A concrete example may be a web shop that sells phones and accessories. To find all the accessories of a phone, a query could be executed that selects all accessories for that phone. This query is executed on all members in the cluster and therefore could generate quite a lot of load. However, if we would store the accessories in the same partition as the phone, the partition predicate could use the partitionKey of the phone to select the right partition and then it queries for the accessories for that phone; and this reduces the load on the system and get faster query results.

Configuring the Query Thread Pool

You can change the size of thread pool dedicated to query operations using the pool-size property. Each query consumes a single thread from a Generic Operations ThreadPool on each Hazelcast member - let’s call it the query-orchestrating thread. That thread is blocked throughout the whole execution-span of a query on the member.

The query-orchestrating thread uses the threads from the query-thread pool in the following cases:

  • if you run a PagingPredicate (since each page runs as a separate task)

  • if you set the system property hazelcast.query.predicate.parallel.evaluation to true (since the predicates are evaluated in parallel)

See the Filtering with Paging Predicates section and System Properties appendix for information about paging predicates and for description of the above system property.

Below is an example of that declarative configuration.

  • XML

  • YAML

<hazelcast>
    ...
    <executor-service name="hz:query">
        <pool-size>100</pool-size>
    </executor-service>
    ...
</hazelcast>
hazelcast:
  ...
  executor-service:
    "hz:query":
      pool-size: 100

Below is the equivalent programmatic configuration.

Config cfg = new Config();
cfg.getExecutorConfig("hz:query").setPoolSize(100);

When dealing with the query requests coming from the clients to your members, Hazelcast offers the following system properties to tune your thread pools:

  • hazelcast.clientengine.thread.count which is the number of threads to process non-partition-aware client requests, like map.size() and executor tasks. Its default value is the number of cores multiplied by 20.

  • hazelcast.clientengine.query.thread.count which is the number of threads to process query requests coming from the clients. Its default value is the number of cores.

If there are a lot of query request from the clients, you may want to increase the value of hazelcast.clientengine.query.thread.count. In addition to this tuning, you may also consider increasing the value of hazelcast.clientengine.thread.count if the CPU load in your system is not high and there is plenty of free memory.

Creating Custom Query Attributes

Custom attributes allow you to use the Predicates API to query fields, using complex calculations.

For example, imagine a map whose values are Person objects that contain a list of children in a children field.

class Person {
  String name;
  String[] children;
}

You may want to query for Person objects that have more than one child. But, you neither have a field with the total number of childen nor does the Predicates API have a count() method that you can call.

In this case, you can create a custom attribute called childrenCount that returns the number of children for each Person object.

Implementing a ValueExtractor

A custom attribute is a "synthetic" attribute that does not exist as a field or a getter in the object that it is extracted from. Thus, it is necessary to define the policy on how the attribute is supposed to be extracted.

In order to implement a ValueExtractor, implement the com.hazelcast.query.extractor.ValueExtractor interface and the extract() method. This method does not return any values since the extracted value is collected by the ValueCollector. In order to return multiple results from a single extraction, invoke the ValueCollector.collect() method multiple times, so that the collector collects all results.

See the ValueExtractor and ValueCollector Javadocs.

Custom attributes are compatible with all Hazelcast serialization methods.

ValueExtractor with Portable Serialization

Portable serialization is a special kind of serialization where there is no need to have the class of the serialized object on the classpath of a member in order to read its attributes. That is the reason why the target object passed to the ValueExtractor.extract() method is not of the exact type that has been stored. Instead, an instance of a com.hazelcast.query.extractor.ValueReader is passed. ValueReader enables reading the attributes of a Portable object in a generic and type-agnostic way. It contains two methods:

  • read(String path, ValueCollector<T> collector) - enables passing all results directly to the ValueCollector.

  • read(String path, ValueCallback<T> callback) - enables filtering, transforming and grouping the result of the read operation and manually passing it to the ValueCollector.

See the ValueReader Javadoc.

Returning Multiple Values from a Single Extraction

It sounds counter-intuitive, but a single extraction may return multiple values when arrays or collections are involved. Let’s have a look at the following data structure in pseudo-code:

class Motorbike {
    Wheel[] wheel;
}

class Wheel {
    String name;
}

Let’s assume that we want to extract the names of all wheels from a single motorbike object. Each motorbike has two wheels so there are two names for each bike. In order to return both values from the extraction operation, collect them separately using the ValueCollector. Collecting multiple values in this way allows you to operate on these multiple values as if they were single values during the evaluation of the predicates.

Let’s assume that we registered a custom extractor with the name wheelName and executed the following query: wheelName = front-wheel.

The extraction may return up to two wheel names for each Motorbike since each Motorbike has up to two wheels. In such a case, it is enough if a single value evaluates the predicate’s condition to true to return a match, so it returns a Motorbike if "any" of the wheels matches the expression.

Extraction Arguments

A ValueExtractor may use a custom argument if it is specified in the query. The custom argument may be passed within the square brackets located after the name of the custom attribute, e.g., customAttribute[argument].

Let’s have a look at the following query: currency[incoming] == EUR The currency is a custom attribute that uses a com.test.CurrencyExtractor for extraction.

The string incoming is an argument that is passed to the ArgumentParser during the extraction. The parser parses the string according to its custom logic and it returns a parsed object. The parsed object may be a single object, array, collection, or any arbitrary object. It is up to the ValueExtractor implementation to understand the semantics of the parsed argument object.

For now it is not possible to register a custom ArgumentParser, thus a default parser is used. It follows a pass-through semantic, which means that the string located in the square brackets is passed "as is" to the ValueExtractor.extract() method.

Please note that using square brackets within the argument string are not allowed.

Configuring a Custom Attribute Programmatically

The following snippet demonstrates how to define a custom attribute using a ValueExtractor.

AttributeConfig attributeConfig = new AttributeConfig();
attributeConfig.setName("currency");
attributeConfig.setExtractorClassName("com.bank.CurrencyExtractor");

MapConfig mapConfig = new MapConfig();
mapConfig.addAttributeConfig(attributeConfig);

currency is the name of the custom attribute that will be extracted using the CurrencyExtractor class.

Keep in mind that an extractor may not be added after the map has been instantiated. All extractors have to be defined upfront in the map’s initial configuration.

Configuring a Custom Attribute Declaratively

The following snippet demonstrates how to define a custom attribute in the Hazelcast XML Configuration.

  • XML

  • YAML

<hazelcast>
    ...
    <map name="trades">
        <attributes>
            <attribute extractor-class-name="com.bank.CurrencyExtractor">currency</attribute>
        </attributes>
    </map>
    ...
</hazelcast>
hazelcast:
  map:
    trades:
      attributes:
        currency:
          extractor-class-name: com.bank.CurrencyExtractor

Analogous to the example above, currency is the name of the custom attribute that will be extracted using the CurrencyExtractor class.

Please note that an attribute name may begin with an ASCII letter [A-Za-z] or digit [0-9] and may contain ASCII letters [A-Za-z], digits [0-9] or underscores later on.

Indexing Custom Attributes

You can create an index using a custom attribute.

The name of the attribute used in the index definition has to match the one used in the attributes configuration.

Defining indexes with extraction arguments is allowed, as shown in the example below:

  • XML

  • YAML

<hazelcast>
    ...
    <indexes>
        <!-- custom attribute without an extraction argument -->
        <index>
            <attributes>
                <attribute>currency</attribute>
            </attributes>
        </index>
        <!-- custom attribute using an extraction argument -->
        <index>
            <attributes>
                <attribute>currency[incoming]</attribute>
            </attributes>
        </index>
    </indexes>
    ...
</hazelcast>
hazelcast:
  ...
  indexes:
    attributes:
      - "currency"
      - "currency[incoming]"

Querying in Collections and Arrays

Hazelcast allows querying in collections and arrays. Querying in collections and arrays is compatible with all Hazelcast serialization methods, including the Portable serialization.

Let’s have a look at the following data structure expressed in pseudo-code:

class Motorbike {
    Wheel[] wheels;
}

class Wheel {
   String name;

}

In order to query a single element of a collection/array, you can execute the following query:

// it matches all motorbikes where the zero wheel's name is 'front-wheel'
Predicate p = Predicates.equal("wheels[0].name", "front-wheel");
Collection<Motorbike> result = map.values(p);

It is also possible to query a collection/array using the any semantic as shown below:

// it matches all motorbikes where any wheel's name is 'front-wheel'
Predicate p = Predicates.equal("wheels[any].name", "front-wheel");
Collection<Motorbike> result = map.values(p);

The exact same query may be executed using the SQL predicate as shown below:

Predicate p = Predicates.sql("wheels[any].name = 'front-wheel'");
Collection<Motorbike> result = map.values(p);

[] notation applies to both collections and arrays.

Hazelcast requires all elements of a collection to have the same type. Considering and expanding the above example:

  • If you have a wheels collection attribute, all its elements must be of the Wheel type, subclasses of Wheel are not allowed.

  • Let’s say you have added a seats collection attribute, which is a Seat object. Then all its elements must of this concrete Seat type.

So, you may have collections of different types in your map. However, each collection’s elements must be of the same concrete type within that collection attribute.

Consider custom attribute extractors if it is impossible or undesirable to reduce the variety of types to a single type. See the Custom Attributes section for information about them.

Indexing in Collections and Arrays

You can also create an index using a query in collections and arrays.

Please note that in order to leverage the index, the attribute name used in the query has to be the same as the one used in the index definition.

Let’s assume you have the following index definition:

  • XML

  • YAML

<hazelcast>
    ...
    <indexes>
        <index type="HASH">
            <attributes>
                <attribute>wheels[any].name</attribute>
            </attributes>
        </index>
    </indexes>
    ...
</hazelcast>
hazelcast:
  ...
  indexes:
    - type: HASH
      attributes:
        - wheels.[any].name

The following query uses the index:

Predicate p = Predicates.equal("wheels[any].name", "front-wheel");

The following query, however, does NOT leverage the index, since it does not use exactly the same attribute name that was used in the index:

Predicates.equal("wheels[0].name", "front-wheel")

In order to use the index in the case mentioned above, you have to create another index, as shown below:

  • XML

  • YAML

<hazelcast>
    ...
    <indexes>
        <index type="HASH">
            <attributes>
                <attribute>wheels[0].name</attribute>
            </attributes>
        </index>
    </indexes>
    ...
</hazelcast>
hazelcast:
  ...
  indexes:
    - type: HASH
      attributes:
        - wheels.[0].name

Corner cases

Handling of corner cases may be a bit different than in a programming language like Java.

Let’s have a look at the following examples in order to understand the differences. To make the analysis simpler, let’s assume that there is only one Motorbike object stored in a Hazelcast Map.

Id Query Data State Extraction Result Match

1

Predicates.equal("wheels[7].name", "front-wheel")

wheels.size() == 1

null

No

2

Predicates.equal("wheels[7].name", null)

wheels.size() == 1

null

Yes

3

Predicates.equal("wheels[0].name", "front-wheel")

wheels[0].name == null

null

No

4

Predicates.equal("wheels[0].name", null)

wheels[0].name == null

null

Yes

5

Predicates.equal("wheels[0].name", "front-wheel")

wheels[0] == null

null

No

6

Predicates.equal("wheels[0].name", null)

wheels[0] == null

null

Yes

7

Predicates.equal("wheels[0].name", "front-wheel")

wheels == null

null

No

8

Predicates.equal("wheels[0].name", null)

wheels == null

null

Yes

As you can see, no NullPointerExceptions or IndexOutOfBoundExceptions are thrown in the extraction process, even though parts of the expression are null.

Looking at examples 4, 6 and 8, we can also easily notice that it is impossible to distinguish which part of the expression was null. If we execute the following query wheels[1].name = null, it may be evaluated to true because:

  • wheels collection/array is null

  • index == 1 is out of bound

  • name attribute of the wheels[1] object is null.

In order to make the query unambiguous, extra conditions would have to be added, e.g., wheels != null AND wheels[1].name = null.

Aggregations

Aggregations allow you to compute a value of some function (e.g sum or max) over the stored map entries. The computation is performed in a fully distributed manner, so no data other than the computed function value is transferred to a caller, making the computation fast.

Aggregations are available on com.hazelcast.map.IMap only. IMap offers the method aggregate to apply the aggregation logic on the map entries. This method can be called with or without a predicate. You can refer to its Javadoc to see the method details.

If the in-memory format of your data is NATIVE, aggregations always run on the partition threads. If the data is of type BINARY or OBJECT, they also mostly run on the partition threads, however, they may run on the separate query threads to avoid blocking partition threads (if there are no ongoing migrations).

Aggregator API

The aggregation is split into three phases represented by three methods:

  1. accumulate()

  2. combine()

  3. aggregate()

There are also the following callbacks:

  • onAccumulationFinished() called when the accumulation phase finishes

  • onCombinationFinished() called when the combination phase finishes

These callbacks enable releasing the state that might have been initialized and stored in the Aggregator - to reduce the network traffic.

Each phase is described below. See also the Aggregator Javadoc for the API’s details.

Accumulation:

During the accumulation phase each Aggregator accumulates all entries passed to it by the query engine. It accumulates only those pieces of information that are required to calculate the aggregation result in the last phase - that’s implementation specific.

In case of the DoubleAverage aggregation the Aggregator would accumulate:

  • the sum of the elements it accumulated

  • the count of the elements it accumulated

Combination:

Since aggregation is executed in parallel on each partition of the cluster, the results need to be combined after the accumulation phase in order to be able to calculate the final result.

In case of the DoubleAverage aggregation, the aggregator would sum up all the sums of the elements and all the counts.

Aggregation:

Aggregation is the last phase that calculates the final result from the results accumulated and combined in the preceding phases.

In case of the DoubleAverage aggregation, the Aggregator would just divide the sum of the elements by their count (if non-zero).

Example Implementation

Here’s an example implementation of the Aggregator:

    private static void simpleCustomAverageAggregation(IMap<String, FAEmployee> employees) {
        System.out.println("Calculating salary average");

        double avgSalary = employees.aggregate(new Aggregator<Map.Entry<String, FAEmployee>, Double>() {

            protected long sum;
            protected long count;

            @Override
            public void accumulate(Map.Entry<String, FAEmployee> entry) {
                count++;
                sum += entry.getValue().getSalaryPerMonth();
            }

            @Override
            public void combine(Aggregator aggregator) {

                this.sum += this.getClass().cast(aggregator).sum;
                this.count += this.getClass().cast(aggregator).count;
            }

            @Override
            public Double aggregate() {
                if (count == 0) {
                    return null;
                }
                return ((double) sum / (double) count);
            }

        });

        System.out.println("Overall average salary: " + avgSalary);
        System.out.println("\n");
    }

As you can see:

  • the accumulate() method calculates the sum and count of the elements

  • the combine() method combines the results from all the accumulations

  • the aggregate() method calculates the final result.

Built-In Aggregations

The com.hazelcast.aggregation.Aggregators class provides a wide variety of built-in Aggregators. The full list is presented below:

  • count

  • distinct

  • bigDecimal sum/avg/min/max

  • bigInteger sum/avg/min/max

  • double sum/avg/min/max

  • integer sum/avg/min/max

  • long sum/avg/min/max

  • number avg

  • comparable min/max

  • fixedPointSum, floatingPointSum

To use the any of these Aggregators, instantiate them using the Aggregators factory class.

Each built-in Aggregator can also navigate to an attribute of the object passed to the accumulate() method (via reflection). For example, Aggregators.distinct("address.city") extracts the address.city attribute from the object passed to the Aggregator and accumulate the extracted value.

Configuration Options

On each partition, after the entries have been passed to the aggregator, the accumulation runs in parallel. It means that each aggregator is cloned and receives a sub-set of the entries received from a partition. Then, it runs the accumulation phase in all the cloned aggregators - at the end, the result is combined into a single accumulation result. It speeds up the processing by at least the factor of two - even in case of simple aggregations. If the accumulation logic is more "heavy", the speed-up may be more significant.

In order to switch the accumulation into a sequential mode just set the hazelcast.aggregation.accumulation.parallel.evaluation property to false (it’s set to true by default).

Projections

There are cases where instead of sending all the data returned by a query from a member, you want to transform (strip down) each result object in order to avoid redundant network traffic.

For example, you select all employees based on some criteria, but you just want to return their name instead of the whole Employee object. It is easily doable with the Projection API.

Projections are available on com.hazelcast.map.IMap only. IMap offers the method project to apply the projection logic on the map entries. This method can be called with or without a predicate. See its Javadoc to see the method details.

Projection API

The Projection API provides the method transform() which is called on each result object. Its result is then gathered as the final query result entity. You can refer to the Projection Javadoc for the API’s details.

Example implementation

Let’s consider the following domain object stored in an IMap:

public class Employee implements Serializable {

    private String name;

    public Employee() {
    }

    public String getName() {
        return name;
    }

    public void setName(String firstName) {
        this.name = name;
    }
}

To return just the names of the Employees, you can run the query in the following way:

Collection<String> names = employees.project(new Projection<Map.Entry<String, Employee>, String>() {

    @Override
    public String transform(Map.Entry<String, Employee> entry) {
        return entry.getValue().getName();
    }
}, somePredicate);

Built-In Projections

The com.hazelcast.projection.Projections class provides two built-in Projections:

  • singleAttribute

  • multiAttribute

The singleAttribute Projection enables extracting a single attribute from an object (via reflection). For example, Projection.singleAttribute("address.city") extracts the address.city attribute from the object passed to the Projection.

The multiAttribute Projection enables extracting multiples attributes from an object (via reflection). For example, Projection.multiAttribute("address.city", "postalAddress.city") extracts both attributes from the object passed to the Projection and return them in an Object[] array.

Continuous Query Cache

A CQC allows you to create a map that has a continuous query attached to it. Whenever a new entry is added to the map, the query runs and if the entry matches the query, it is added to the query cache. By using a CQC, you do not need to query a map for the same data all the time and all the query results are kept locally.

Queries must be written using the Predicate API. CQC does not support SQL queries.

You can create a CQC either from a Java client or on a member.

You cannot create a CQC from any client other than Java.

Accessing a Continuous Query Cache from a Member

The following code snippet shows how you can access a continuous query cache from a member.

        QueryCacheConfig queryCacheConfig = new QueryCacheConfig("cache-name");
        queryCacheConfig.getPredicateConfig().setImplementation(new OddKeysPredicate());

        MapConfig mapConfig = new MapConfig("map-name");
        mapConfig.addQueryCacheConfig(queryCacheConfig);

        Config config = new Config();
        config.addMapConfig(mapConfig);

        HazelcastInstance node = Hazelcast.newHazelcastInstance(config);
        IMap<Integer, String> map = (IMap) node.getMap("map-name");

Accessing a Continuous Query Cache from a Java Client

The following code snippet shows how you can access a continuous query cache from the client side. The difference in this code from the member side code is that you configure and instantiate a client instance instead of a member instance.

        QueryCacheConfig queryCacheConfig = new QueryCacheConfig("cache-name");
        queryCacheConfig.getPredicateConfig().setImplementation(new OddKeysPredicate());

        ClientConfig clientConfig = new ClientConfig();
        clientConfig.addQueryCacheConfig("map-name", queryCacheConfig);

        HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
        IMap<Integer, Integer> clientMap = (IMap) client.getMap("map-name");

        QueryCache<Integer, Integer> cache = clientMap.getQueryCache("cache-name");

Features of Continuous Query Cache

The following features of continuous query cache are valid for both the member and client:

  • The initial query that is run on the existing map entries during the continuous query cache construction can be enabled/disabled according to the supplied predicate via QueryCacheConfig.setPopulate().

  • Continuous query cache allows you to run queries with indexes and perform event batching and coalescing.

  • A continuous query cache is evictable. Note that a continuous query cache has a default maximum capacity of 10000. If you need a non-evictable cache, you should configure the eviction via QueryCacheConfig.setEvictionConfig().

  • A listener can be added to a continuous query cache using QueryCache.addEntryListener().

  • IMap events are reflected in continuous query cache in the same order as they were generated on map entries. Since events are created on entries stored in partitions, ordering of events is maintained based on the ordering within the partition. You can add listeners to capture lost events using EventLostListener and you can recover lost events with the method QueryCache.tryRecover(). Recovery of lost events largely depends on the size of the buffer on Hazelcast members. Default buffer size is 16 per partition, i.e., 16 events per partition can be maintained in the buffer. If the event generation is high, setting the buffer size to a higher number provides better chances of recovering lost events. You can set buffer size with QueryCacheConfig.setBufferSize(). You can use the following example code for a recovery case.

    QueryCache queryCache = map.getQueryCache("cache-name", Predicates.sql("this > 20"), true);
    queryCache.addEntryListener(new EventLostListener() {
    @Override
    public void eventLost(EventLostEvent event) {
           queryCache.tryRecover();
          }
    }, false);
  • You can populate a continuous query cache with only the keys of its entries and retrieve the subsequent values directly via QueryCache.get() from the underlying IMap. This helps to decrease the initial population time when the values are very large.

Configuring a Continuous Query Cache

You can configure a continuous query cache declaratively or programmatically; the latter is mostly explained in the previous section. The parent configuration element is <query-caches> which should be placed within your <map> configuration. You can create your query caches using the <query-cache> sub-element under <query-caches>.

If you use mutable keys in maps, consider setting the serialize-keys option to true so that Hazelcast clones the keys rather than using an internal reference. Otherwise, Hazelcast may store references to outdated versions of your keys.

Setting the serialize-keys option to true has a negative impact on performance.

The following is an example declarative configuration.

  • XML

  • YAML

<hazelcast>
    ...
    <map>
        <query-caches>
            <query-cache name="myContQueryCache">
                <serialize-keys>false</serialize-keys>
                <include-value>true</include-value>
                <predicate type="class-name">com.hazelcast.examples.ExamplePredicate</predicate>
                <entry-listeners>
                    <entry-listener>...</entry-listener>
                </entry-listeners>
                <in-memory-format>BINARY</in-memory-format>
                <populate>true</populate>
                <coalesce>false</coalesce>
                <batch-size>2</batch-size>
                <delay-seconds>3</delay-seconds>
                <buffer-size>32</buffer-size>
                <eviction size="1000" max-size-policy="ENTRY_COUNT" eviction-policy="LFU"/>
                <indexes>
                    <index>
                        <attributes>
                            <attribute>age</attribute>
                        </attributes>
                    </index>
                </indexes>
            </query-cache>
        </query-caches>
    </map>
    ...
</hazelcast>
hazelcast:
  map:
    query-caches:
        myContQueryCache:
          serialize-keys: false
          include-value: true
          predicate:
            class-name: com.hazelcast.examples.ExamplePredicate
          entry-listeners:
            - class-name: "..."
          in-memory-format: BINARY
          populate: true
          coalesce: false
          batch-size: 2
          delay-seconds: 3
          buffer-size: 32
          eviction:
            size: 1000
            max-size-policy: ENTRY_COUNT
            eviction-policy: LFU
          indexes:
            - attributes:
              - "age"

Continuous query caches have the following configuration elements:

  • name: Name of your continuous query cache.

  • serialize-keys: Whether Hazelcast serializes the query cache keys. Keys should be serialized if they are mutable and need to be cloned via serialization. Default value is false.

  • include-value: Specifies whether the value will be cached too. Its default value is true.

  • predicate: Predicate to filter events which are applied to the query cache.

  • entry-listeners: Adds listeners (listener classes) for your query cache entries. See the Registering Map Listeners section.

  • in-memory-format: Type of the data to be stored in your query cache. See the in-memory format section. Its default value is BINARY.

  • populate: Specifies whether the initial population of your query cache is enabled. Its default value is true.

  • coalesce: Specifies whether the coalescing of your query cache is enabled. Its default value is false.

  • delay-seconds: Minimum time in seconds that an event waits in the member’s buffer. Its default value is 0.

  • batch-size: Batch size used to determine the number of events sent in a batch to your query cache. Its default value is 1.

  • buffer-size: Maximum number of events which can be stored in a partition buffer. Its default value is 16.

  • eviction: Configuration for the eviction of your query cache. See the Configuring Map Eviction section.

  • indexes: Indexes for your query cache defined by using this element’s <index> sub-elements. See the Configuring IMap Indexes section.

Please take the following configuration considerations and publishing logic into account:

If delay-seconds is equal to or smaller than 0, then batch-size loses its function. Each time there is an event, all the entries in the buffer are pushed to the subscriber.

If delay-seconds is bigger than 0, the following logic applies:

  • If coalesce is set to true, the buffer is checked for an event with the same key; if so, it is overridden by the current event. Then:

    • The current size of the buffer is checked: if the current size of the buffer is equal to or larger than batch-size, then the events counted as much as the batch-size are pushed to the subscriber. Otherwise, no events are sent.

    • After finishing with checking batch-size, the delay-seconds is checked. The buffer is scanned from the oldest to youngest entries; all the entries that are older than delay-seconds are pushed to the subscriber.