Predicates API
The Predicates API is a programming interface for querying data in Hazelcast, which is similar to the Java Persistence Query Language (JPQL).
In addition to the Java-based Predicates API, Hazelcast also supports SQL statements and functions, and streaming SQL queries. |
How the Predicates API Works
The predicate is sent to each member in the cluster. Each member looks at its own local entries and filters them according to the predicate. At this stage, key/value pairs of the entries are deserialized and then passed to the predicate. The predicate requester merges all the results coming from each member into a single set.
Distributed query is highly scalable. If you add new members to the cluster, the partition count for each member is reduced and thus the time spent by each member on iterating its entries is reduced. In addition, the pool of partition threads evaluates the entries concurrently in each member and the network traffic is also reduced since only filtered data is sent to the requester.
Querying Maps with the Predicate API
The Predicates API offers built-in methods for creating queries as well as an sql()
method for SQL-like queries.
Assume that you have a map called employee
that contains values of
Employee
objects:
public class Employee implements Serializable {
private String name;
private int age;
private boolean active;
private double salary;
public Employee(String name, int age, boolean active, double salary) {
this.name = name;
this.age = age;
this.active = active;
this.salary = salary;
}
public Employee() {
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
public double getSalary() {
return salary;
}
public boolean isActive() {
return active;
}
}
Now, look for employees who are active and have an age less than 30.
When using Portable objects, if one field of an object exists on one member but does not exist on another one, Hazelcast does not throw an unknown field exception. Instead, Hazelcast treats that predicate, which tries to perform a query on an unknown field, as an always false predicate. |
IMap<String, Employee> map = hazelcastInstance.getMap( "employee" );
EntryObject e = Predicates.newPredicateBuilder().getEntryObject();
Predicate predicate = e.is( "active" ).and( e.get( "age" ).lessThan( 30 ) );
Collection<Employee> employees = map.values( predicate );
In the above example code, predicate
verifies whether the entry is
active and its age
value is less than 30. This predicate
is
applied to the employee
map using the map.values(predicate)
method.
This method sends the predicate to all cluster members
and merges the results coming from them. Since the predicate is
communicated between the members, it needs to
be serializable.
Predicates can also be applied to keySet() , entrySet() and
localKeySet() methods of distributed maps.
|
Predicates Class Operators
The Predicates
class includes many operators for your query requirements.
The following are descriptions for some of them:
-
equal
: Checks if the result of an expression is equal to a given value. -
notEqual
: Checks if the result of an expression is not equal to a given value. -
instanceOf
: Checks if the result of an expression has a certain type. -
like
: Checks if the result of an expression matches some string pattern. % (percentage sign) is the placeholder for many characters, (underscore) is placeholder for only one character. -
ilike
: A case-insensitive variant oflike
. -
greaterThan
: Checks if the result of an expression is greater than a certain value. -
greaterEqual
: Checks if the result of an expression is greater than or equal to a certain value. -
lessThan
: Checks if the result of an expression is less than a certain value. -
lessEqual
: Checks if the result of an expression is less than or equal to a certain value. -
between
: Checks if the result of an expression is between two values (this is inclusive). -
in
: Checks if the result of an expression is an element of a certain collection. -
isNot
: Checks if the result of an expression is false. -
regex
: Checks if the result of an expression matches some regular expression. -
alwaysTrue
: The result of an expression always matches. -
alwaysFalse
: The result of an expression ever matches.
See the Predicates Javadoc for all predicates provided. |
Combining Predicates with AND, OR, NOT
You can combine predicates using the and
, or
and not
operators,
as shown in the below examples.
public Collection<Employee> getWithNameAndAge( String name, int age ) {
Predicate namePredicate = Predicates.equal( "name", name );
Predicate agePredicate = Predicates.equal( "age", age );
Predicate predicate = Predicates.and( namePredicate, agePredicate );
return employeeMap.values( predicate );
}
public Collection<Employee> getWithNameOrAge( String name, int age ) {
Predicate namePredicate = Predicates.equal( "name", name );
Predicate agePredicate = Predicates.equal( "age", age );
Predicate predicate = Predicates.or( namePredicate, agePredicate );
return employeeMap.values( predicate );
}
public Collection<Employee> getNotWithName( String name ) {
Predicate namePredicate = Predicates.equal( "name", name );
Predicate predicate = Predicates.not( namePredicate );
return employeeMap.values( predicate );
}
Simplifying with PredicateBuilder
You can simplify predicate usage with the PredicateBuilder
interface,
which offers simpler predicate building. See the
below example code which selects all people with a certain name and age.
public Collection<Employee> getWithNameAndAgeSimplified( String name, int age ) {
EntryObject e = Predicates.newPredicateBuilder().getEntryObject();
Predicate agePredicate = e.get( "age" ).equal( age );
Predicate predicate = e.get( "name" ).equal( name ).and( agePredicate );
return employeeMap.values( predicate );
}
Querying with SQL-like Predicates
Predicates.sql()
takes the regular SQL where
clause.
Here is an example:
IMap<String, Employee> map = hazelcastInstance.getMap( "employee" );
Set<Employee> employees = map.values( Predicates.sql( "active AND age < 30" ) );
Hazelcast offers a SQL service that allows you to execute SQL queries,
as opposed to SQL-like predicates in case of Predicates.sql() . See
SQL for more information.
|
SQL-like predicates support the following syntax:
AND/OR: <expression> AND <expression> AND <expression>…
-
active AND age>30
-
active=false OR age = 45 OR name = 'Joe'
-
active AND ( age > 20 OR salary < 60000 )
Equality: =, !=, <, <=, >, >=
-
<expression> = value
-
age <= 30
-
name = 'Joe'
-
salary != 50000
BETWEEN: <attribute> [NOT] BETWEEN <value1> AND <value2>
-
age BETWEEN 20 AND 33 ( same as age >= 20 AND age <= 33 )
-
age NOT BETWEEN 30 AND 40 ( same as age < 30 OR age > 40 )
IN: <attribute> [NOT] IN (val1, val2,…)
-
age IN ( 20, 30, 40 )
-
age NOT IN ( 60, 70 )
-
active AND ( salary >= 50000 OR ( age NOT BETWEEN 20 AND 30 ) )
-
age IN ( 20, 30, 40 ) AND salary BETWEEN ( 50000, 80000 )
LIKE: <attribute> [NOT] LIKE "expression"
The %
(percentage sign) is placeholder for multiple characters,
an _
(underscore) is placeholder for only one character.
-
name LIKE 'Jo%'
(true for 'Joe', 'Josh', 'Joseph' etc.) -
name LIKE 'Jo_'
(true for 'Joe'; false for 'Josh') -
name NOT LIKE 'Jo_'
(true for 'Josh'; false for 'Joe') -
name LIKE 'J_s%'
(true for 'Josh', 'Joseph'; false 'John', 'Joe')
ILIKE: <attribute> [NOT] ILIKE 'expression'
Similar to LIKE predicate but in a case-insensitive manner.
-
name ILIKE 'Jo%'
(true for 'Joe', 'joe', 'jOe','Josh','joSH', etc.) -
name ILIKE 'Jo_'
(true for 'Joe' or 'jOE'; false for 'Josh')
REGEX: <attribute> [NOT] REGEX 'expression'
-
name REGEX 'abc-.*'
(true for 'abc-123'; false for 'abx-123')
You can escape the % and _ placeholder characters in your
SQL queries with predicates using the
backslash (\ ) character. The apostrophe (' ) can be escaped with another
apostrophe, i.e., '' . If you use REGEX, you need to escape characters
according to the normal Java escape syntax; see the Oracle API docs
for details.
|
Querying Entry Keys with Predicates
You can use __key
attribute to perform a predicated search for entry
keys. See the following example:
IMap<String, Person> personMap = hazelcastInstance.getMap(persons);
personMap.put("Alice", new Person("Alice", 35, Gender.FEMALE));
personMap.put("Andy", new Person("Andy", 37, Gender.MALE));
personMap.put("Bob", new Person("Bob", 22, Gender.MALE));
[...]
Predicate predicate = Predicates.sql("__key like A%");
Collection<Person> startingWithA = personMap.values(predicate);
In this example, the code creates a collection with the entries whose keys start with the letter "A”.
Querying JSON Strings
You can query JSON strings stored inside your Hazelcast clusters. To
query a JSON string,
you first need to create a HazelcastJsonValue
from the JSON string.
You can use HazelcastJsonValue
objects both as keys and values in
distributed data structures. Then, it is
possible to query these objects using the Hazelcast query methods
explained in this section.
String person1 = "{ \"name\": \"John\", \"age\": 35 }";
String person2 = "{ \"name\": \"Jane\", \"age\": 24 }";
String person3 = "{ \"name\": \"Trey\", \"age\": 17 }";
IMap<Integer, HazelcastJsonValue> idPersonMap = instance.getMap("jsonValues");
idPersonMap.put(1, new HazelcastJsonValue(person1));
idPersonMap.put(2, new HazelcastJsonValue(person2));
idPersonMap.put(3, new HazelcastJsonValue(person3));
Collection<HazelcastJsonValue> peopleUnder21 = idPersonMap.values(Predicates.lessThan("age", 21));
When running the queries, Hazelcast treats values extracted from
the JSON documents as Java types so they
can be compared with the query attribute. JSON specification
defines five primitive types to be used in the JSON
documents: number
,string
, true
, false
and null
. The string
,
true/false
and null
types are treated
as String
, boolean
and null
, respectively. We treat the extracted
number
values as long
where possible. Otherwise, number
types are treated
as double
.
It is possible to query nested attributes and arrays in JSON documents, using the Predicates API. The query syntax is the same as querying collections and arrays in other Hazelcast objects.
/**
* Sample JSON object
*
* {
* "departmentId": 1,
* "room": "alpha",
* "people": [
* {
* "name": "Peter",
* "age": 26,
* "salary": 50000
* },
* {
* "name": "Jonah",
* "age": 50,
* "salary": 140000
* }
* ]
* }
*
*
* The following query finds all the departments that have a person named "Peter" working in them.
*/
Collection<HazelcastJsonValue> departmentWithPeter = departments.values(Predicates.equal("people[any].name", "Peter"));
Filtering with Paging Predicates
Hazelcast provides paging for defined predicates. With its PagingPredicate
interface, you can
get a collection of keys, values, or entries page by page by filtering
them with predicates and giving the size of the pages. Also, you
can sort the entries by specifying comparators. In this case, the comparator
should be Serializable
and the serialization factory implementations you use,
e.g., PortableFactory
and DataSerializableFactory
, should be registered.
See the Serialization chapter on how to register these
factories.
Paging predicates require the objects to be deserialized both on the calling side (either a member or client) and the member side from which the collection is retrieved. Therefore, you need to register the serialization factories you use on all the members and clients on which the paging predicates are used. See the Serialization chapter on how to register these factories.
In the example code below:
-
The
greaterEqual
predicate gets values from the "students" map. This predicate has a filter to retrieve the objects with an "age" greater than or equal to 18. -
Then a
PagingPredicate
is constructed in which the page size is 5, so that there are five objects in each page. The first time thevalues()
method is called, the first page is fetched. -
Finally, the subsequent page is fetched by calling the
nextPage()
method ofPagingPredicate
and querying the map again with the updatedPagingPredicate
.
IMap<Integer, Student> map = hazelcastInstance.getMap( "students" );
Predicate greaterEqual = Predicates.greaterEqual( "age", 18 );
PagingPredicate pagingPredicate = Predicates.pagingPredicate( greaterEqual, 5 );
// Retrieve the first page
Collection<Student> values = map.values( pagingPredicate );
...
// Set up next page
pagingPredicate.nextPage();
// Retrieve next page
values = map.values( pagingPredicate );
...
If a comparator is not specified for PagingPredicate
, but you want
to get a collection of keys or values page by page, keys or values must
be instances of Comparable
(i.e., they must implement java.lang.Comparable
).
Otherwise, the java.lang.IllegalArgument
exception is thrown.
You can also access a specific page more
easily with the help of the setPage()
method. This way, if you make
a query for the hundredth page, for example, it gets all 100 pages at
once instead of reaching the hundredth page one by one using the nextPage()
method.
Note that this feature tires the memory and see the
PagingPredicate Javadoc.
Paging Predicate, also known as Order & Limit, is not supported in Transactional Context. |
Filtering with Partition Predicate
You can run queries on a single partition in your cluster using
the partition predicate (PartitionPredicate
).
The Predicates.partitionPredicate()
method takes a predicate and partition key
as parameters, gets the partition ID using the key and runs that predicate only
on the partition where that key belongs.
See the following code snippet:
...
Predicate predicate = Predicates.partitionPredicate(partitionKey, Predicates.alwaysTrue());
Collection<Integer> values = map.values(predicate);
Collection<String> keys = map.keySet(predicate);
...
By default there are 271 partitions, and using a regular predicate, each partition needs to be accessed. However, if the partition predicate only accesses a single partition, this can lead to a big performance gain.
For the partition predicate to work correctly, you need to know which
partition your data belongs to so that you can send the
request to the correct partition. One of the ways of doing it is to
make use of the PartitionAware
interface when data is
inserted, thereby controlling the owning partition. See the
PartitionAware section for more information and examples.
A concrete example may be a web shop that sells phones and accessories.
To find all the accessories of a phone,
a query could be executed that selects all accessories for that phone.
This query is executed on all members in the cluster and
therefore could generate quite a lot of load. However, if we would store
the accessories in the same partition as the phone, the
partition predicate could use the partitionKey
of the phone to select
the right partition and then it queries for
the accessories for that phone; and this reduces the load on the system
and get faster query results.
Configuring the Query Thread Pool
You can change the size of thread pool dedicated to query operations
using the pool-size
property. Each query consumes a single thread
from a Generic Operations ThreadPool on each Hazelcast member - let’s
call it the query-orchestrating thread. That thread is blocked throughout
the whole execution-span of a query on the member.
The query-orchestrating thread uses the threads from the query-thread pool in the following cases:
-
if you run a
PagingPredicate
(since each page runs as a separate task) -
if you set the system property
hazelcast.query.predicate.parallel.evaluation
to true (since the predicates are evaluated in parallel)
See the Filtering with Paging Predicates section and System Properties appendix for information about paging predicates and for description of the above system property.
Below is an example of that declarative configuration.
<hazelcast>
...
<executor-service name="hz:query">
<pool-size>100</pool-size>
</executor-service>
...
</hazelcast>
hazelcast:
...
executor-service:
"hz:query":
pool-size: 100
Below is the equivalent programmatic configuration.
Config cfg = new Config();
cfg.getExecutorConfig("hz:query").setPoolSize(100);
When dealing with the query requests coming from the clients to your members, Hazelcast offers the following system properties to tune your thread pools:
-
hazelcast.clientengine.thread.count
which is the number of threads to process non-partition-aware client requests, likemap.size()
and executor tasks. Its default value is the number of cores multiplied by 20. -
hazelcast.clientengine.query.thread.count
which is the number of threads to process query requests coming from the clients. Its default value is the number of cores.
If there are a lot of query request from the clients, you may want to
increase the value of hazelcast.clientengine.query.thread.count
. In
addition to this tuning, you may also consider increasing the value of
hazelcast.clientengine.thread.count
if the CPU load in your system is
not high and there is plenty of free memory.
Creating Custom Query Attributes
Custom attributes allow you to use the Predicates API to query fields, using complex calculations.
For example, imagine a map whose values are Person
objects that contain a list of children in a children
field.
class Person {
String name;
String[] children;
}
You may want to query for Person
objects that have more than one child. But, you neither have a field with the total number of childen nor does the Predicates API have a count()
method that you can call.
In this case, you can create a custom attribute called childrenCount
that returns the number of children for each Person
object.
Implementing a ValueExtractor
A custom attribute is a "synthetic" attribute that does not exist as a field or a getter in the object that it is extracted from. Thus, it is necessary to define the policy on how the attribute is supposed to be extracted.
In order to implement a ValueExtractor
, implement the
com.hazelcast.query.extractor.ValueExtractor
interface
and the extract()
method. This method does not return any values
since the extracted value is collected by the ValueCollector
.
In order to return multiple results from a single extraction, invoke the
ValueCollector.collect()
method
multiple times, so that the collector collects all results.
See the ValueExtractor and ValueCollector Javadocs.
Custom attributes are compatible with all Hazelcast serialization methods. |
ValueExtractor with Portable Serialization
Portable serialization is a special kind of serialization where there
is no need to have the class of the serialized object on the
classpath of a member in order to read its attributes. That is the reason why the
target object passed to the ValueExtractor.extract()
method is not of the exact type that has been stored. Instead, an instance
of a com.hazelcast.query.extractor.ValueReader
is passed.
ValueReader
enables reading the attributes of a Portable object in a
generic and type-agnostic way.
It contains two methods:
-
read(String path, ValueCollector<T> collector)
- enables passing all results directly to theValueCollector
. -
read(String path, ValueCallback<T> callback)
- enables filtering, transforming and grouping the result of the read operation and manually passing it to theValueCollector
.
See the ValueReader Javadoc.
Returning Multiple Values from a Single Extraction
It sounds counter-intuitive, but a single extraction may return multiple values when arrays or collections are involved. Let’s have a look at the following data structure in pseudocode:
class Motorbike {
Wheel[] wheel;
}
class Wheel {
String name;
}
Let’s assume that we want to extract the names of all wheels from a
single motorbike object. Each motorbike has two
wheels so there are two names for each bike. In order to return both
values from the extraction operation, collect them
separately using the ValueCollector
. Collecting multiple values in
this way allows you to operate on these multiple
values as if they were single values during the evaluation of the predicates.
Let’s assume that we registered a custom extractor with the name wheelName
and executed the following query:
wheelName = front-wheel
.
The extraction may return up to two wheel names for each Motorbike
since
each Motorbike
has up to two wheels.
In such a case, it is enough if a single value evaluates the predicate’s
condition to true to return a match, so
it returns a Motorbike
if "any" of the wheels matches the expression.
Extraction Arguments
A ValueExtractor
may use a custom argument if it is specified in the query.
The custom argument may be passed within the square brackets located after the
name of the custom attribute,
e.g., customAttribute[argument]
.
Let’s have a look at the following query: currency[incoming] == EUR
The currency
is a custom attribute that uses a com.test.CurrencyExtractor
for extraction.
The string incoming
is an argument that is passed to the ArgumentParser
during the extraction.
The parser parses the string according to its custom logic and it returns a
parsed object.
The parsed object may be a single object, array, collection, or any arbitrary
object.
It is up to the ValueExtractor
implementation to understand the semantics of
the parsed argument object.
For now, it is not possible to register a custom ArgumentParser
, thus a
default parser is used.
It follows a pass-through
semantic, which means that the string located in
the square brackets is passed "as is" to
the ValueExtractor.extract()
method.
Please note that using square brackets within the argument string are not allowed.
Configuring a Custom Attribute Programmatically
The following snippet demonstrates how to define a custom attribute using a ValueExtractor
.
AttributeConfig attributeConfig = new AttributeConfig();
attributeConfig.setName("currency");
attributeConfig.setExtractorClassName("com.bank.CurrencyExtractor");
MapConfig mapConfig = new MapConfig();
mapConfig.addAttributeConfig(attributeConfig);
currency
is the name of the custom attribute that will be extracted using
the CurrencyExtractor
class.
Keep in mind that an extractor may not be added after the map has been instantiated. All extractors have to be defined upfront in the map’s initial configuration.
Configuring a Custom Attribute Declaratively
The following snippet demonstrates how to define a custom attribute in the Hazelcast XML Configuration.
<hazelcast>
...
<map name="trades">
<attributes>
<attribute extractor-class-name="com.bank.CurrencyExtractor">currency</attribute>
</attributes>
</map>
...
</hazelcast>
hazelcast:
map:
trades:
attributes:
currency:
extractor-class-name: com.bank.CurrencyExtractor
Analogous to the example above, currency
is the name of the custom attribute
that will be extracted using the
CurrencyExtractor
class.
Please note that an attribute name may begin with an ASCII letter [A-Za-z] or digit [0-9] and may contain ASCII letters [A-Za-z], digits [0-9] or underscores later on.
Indexing Custom Attributes
You can create an index using a custom attribute.
The name of the attribute used in the index definition has to match the one used in the attributes configuration.
Defining indexes with extraction arguments is allowed, as shown in the example below:
<hazelcast>
...
<indexes>
<!-- custom attribute without an extraction argument -->
<index>
<attributes>
<attribute>currency</attribute>
</attributes>
</index>
<!-- custom attribute using an extraction argument -->
<index>
<attributes>
<attribute>currency[incoming]</attribute>
</attributes>
</index>
</indexes>
...
</hazelcast>
hazelcast:
...
indexes:
attributes:
- "currency"
- "currency[incoming]"
Querying in Collections and Arrays
Hazelcast allows querying in collections and arrays. Querying in collections and arrays is compatible with all Hazelcast serialization methods, including the Portable serialization.
Let’s have a look at the following data structure expressed in pseudocode:
class Motorbike {
Wheel[] wheels;
}
class Wheel {
String name;
}
In order to query a single element of a collection/array, you can execute the following query:
// it matches all motorbikes where the zero wheel's name is 'front-wheel'
Predicate p = Predicates.equal("wheels[0].name", "front-wheel");
Collection<Motorbike> result = map.values(p);
It is also possible to query a collection/array using the any
semantic as shown below:
// it matches all motorbikes where any wheel's name is 'front-wheel'
Predicate p = Predicates.equal("wheels[any].name", "front-wheel");
Collection<Motorbike> result = map.values(p);
The exact same query may be executed using the SQL
predicate as shown below:
Predicate p = Predicates.sql("wheels[any].name = 'front-wheel'");
Collection<Motorbike> result = map.values(p);
[]
notation applies to both collections and arrays.
Hazelcast requires all elements of a collection to have the same type. Considering and expanding the above example:
So, you may have collections of different types in your map. However, each collection’s elements must be of the same concrete type within that collection attribute. Consider custom attribute extractors if it is impossible or undesirable to reduce the variety of types to a single type. See the Custom Attributes section for information about them. |
Indexing in Collections and Arrays
You can also create an index using a query in collections and arrays.
Please note that in order to leverage the index, the attribute name used in the query has to be the same as the one used in the index definition.
Let’s assume you have the following index definition:
<hazelcast>
...
<indexes>
<index type="HASH">
<attributes>
<attribute>wheels[any].name</attribute>
</attributes>
</index>
</indexes>
...
</hazelcast>
hazelcast:
...
indexes:
- type: HASH
attributes:
- wheels.[any].name
The following query uses the index:
Predicate p = Predicates.equal("wheels[any].name", "front-wheel");
The following query, however, does NOT leverage the index, since it does not use exactly the same attribute name that was used in the index:
Predicates.equal("wheels[0].name", "front-wheel")
In order to use the index in the case mentioned above, you have to create another index, as shown below:
Corner cases
Handling of corner cases may be a bit different than in a programming
language like Java
.
Let’s have a look at the following examples in order to understand the differences.
To make the analysis simpler, let’s assume that there is only one Motorbike
object stored in a Hazelcast Map.
Id | Query | Data State | Extraction Result | Match |
---|---|---|---|---|
1 |
|
|
|
No |
2 |
|
|
|
Yes |
3 |
|
|
|
No |
4 |
|
|
|
Yes |
5 |
|
|
|
No |
6 |
|
|
|
Yes |
7 |
|
|
|
No |
8 |
|
|
|
Yes |
As you can see, no NullPointerException
s or IndexOutOfBoundException
s
are thrown in the extraction process, even
though parts of the expression are null
.
Looking at examples 4, 6 and 8, we can also easily notice that it is impossible to
distinguish which part of the
expression was null.
If we execute the following query wheels[1].name = null
, it may be evaluated to
true because:
-
wheels
collection/array is null -
index == 1
is out of bound -
name
attribute of the wheels[1] object isnull
.
In order to make the query unambiguous, extra conditions would have to be added, e.g.,
wheels != null AND wheels[1].name = null
.
Aggregations
Aggregations allow you to compute a value of some function, e.g., sum
or max
, over the
stored map entries. The computation is performed in a fully distributed manner,
so no data other than the computed function value is transferred to a caller,
making the computation fast.
Aggregations are available on com.hazelcast.map.IMap
only.
IMap offers the method aggregate
to apply the aggregation logic on
the map entries. This method can be called with or without a predicate. You can refer
to its Javadoc
to see the method details.
If the in-memory format of your data is NATIVE ,
aggregations always run on the partition threads. If the data is of type BINARY
or OBJECT , they also mostly run on the partition threads, however, they may run on
the separate query threads to avoid blocking partition threads (if there are no ongoing migrations).
|
Aggregator API
The aggregation is split into three phases represented by three methods:
-
accumulate()
-
combine()
-
aggregate()
There are also the following callbacks:
-
onAccumulationFinished()
called when the accumulation phase finishes -
onCombinationFinished()
called when the combination phase finishes
These callbacks enable releasing the state that might have been initialized and stored in the Aggregator - to reduce the network traffic.
Each phase is described below. See also the Aggregator Javadoc for the API’s details.
Accumulation:
During the accumulation phase each Aggregator accumulates all entries passed to it by the query engine. It accumulates only those pieces of information that are required to calculate the aggregation result in the last phase - that’s implementation specific.
In case of the DoubleAverage
aggregation the Aggregator would accumulate:
-
the sum of the elements it accumulated
-
the count of the elements it accumulated
Combination:
Since aggregation is executed in parallel on each partition of the cluster, the results need to be combined after the accumulation phase in order to be able to calculate the final result.
In case of the DoubleAverage
aggregation, the aggregator would sum up all
the sums of the elements and all the counts.
Aggregation:
Aggregation is the last phase that calculates the final result from the results accumulated and combined in the preceding phases.
In case of the DoubleAverage
aggregation, the Aggregator would just
divide the sum of the elements by their count (if non-zero).
Example Implementation
Here’s an example implementation of the Aggregator:
private static void simpleCustomAverageAggregation(IMap<String, FAEmployee> employees) {
System.out.println("Calculating salary average");
double avgSalary = employees.aggregate(new Aggregator<Map.Entry<String, FAEmployee>, Double>() {
protected long sum;
protected long count;
@Override
public void accumulate(Map.Entry<String, FAEmployee> entry) {
count++;
sum += entry.getValue().getSalaryPerMonth();
}
@Override
public void combine(Aggregator aggregator) {
this.sum += this.getClass().cast(aggregator).sum;
this.count += this.getClass().cast(aggregator).count;
}
@Override
public Double aggregate() {
if (count == 0) {
return null;
}
return ((double) sum / (double) count);
}
});
System.out.println("Overall average salary: " + avgSalary);
System.out.println("\n");
}
As you can see:
-
the
accumulate()
method calculates the sum and count of the elements -
the
combine()
method combines the results from all the accumulations -
the
aggregate()
method calculates the final result.
Built-In Aggregations
The com.hazelcast.aggregation.Aggregators
class provides a wide variety of built-in Aggregators.
The full list is presented below:
-
count
-
distinct
-
bigDecimal
sum
/avg
/min
/max
-
bigInteger
sum
/avg
/min
/max
-
double
sum
/avg
/min
/max
-
integer
sum
/avg
/min
/max
-
long
sum
/avg
/min
/max
-
number
avg
-
comparable
min
/max
-
fixedPointSum
,floatingPointSum
To use any of these Aggregators, instantiate them using the Aggregators
factory class.
Each built-in Aggregator can also navigate to an attribute of the object passed
to the accumulate()
method (via reflection). For example, Aggregators.distinct("address.city")
extracts the address.city
attribute from the object passed to the Aggregator and
accumulate the extracted value.
Configuration Options
On each partition, after the entries have been passed to the aggregator, the accumulation runs in parallel. It means that each aggregator is cloned and receives a sub-set of the entries received from a partition. Then, it runs the accumulation phase in all the cloned aggregators - at the end, the result is combined into a single accumulation result. It speeds up the processing by at least the factor of two - even in case of simple aggregations. If the accumulation logic is more "heavy", the speed-up may be more significant.
In order to switch the accumulation into a sequential mode just set the
hazelcast.aggregation.accumulation.parallel.evaluation
property to false
(it’s set to true
by default).
Projections
There are cases where instead of sending all the data returned by a query from a member, you want to transform (strip down) each result object in order to avoid redundant network traffic.
For example, you select all employees based on some criteria, but you just want to return their name instead of the whole Employee object. It is easily doable with the Projection API.
Projections are available on com.hazelcast.map.IMap
only. IMap offers the method
project
to apply the projection logic on the map entries. This method can be called
with or without a predicate. See its
Javadoc
to see the method details.
Projection API
The Projection API provides the method transform()
which is called on each result
object. Its result is then gathered as the final query result entity. You can refer
to the Projection Javadoc
for the API’s details.
Example implementation
Let’s consider the following domain object stored in an IMap:
public class Employee implements Serializable {
private String name;
public Employee() {
}
public String getName() {
return name;
}
public void setName(String firstName) {
this.name = name;
}
}
To return just the names of the Employees, you can run the query in the following way:
Collection<String> names = employees.project(new Projection<Map.Entry<String, Employee>, String>() {
@Override
public String transform(Map.Entry<String, Employee> entry) {
return entry.getValue().getName();
}
}, somePredicate);
Built-In Projections
The com.hazelcast.projection.Projections
class provides two built-in
Projections:
-
singleAttribute
-
multiAttribute
The singleAttribute
Projection enables extracting a single attribute
from an object (via reflection). For example, Projection.singleAttribute("address.city")
extracts the address.city
attribute from the object passed to the Projection.
The multiAttribute
Projection enables extracting multiples attributes from an
object (via reflection). For example, Projection.multiAttribute("address.city", "postalAddress.city")
extracts both attributes from the object passed to the Projection and return them in an Object[]
array.
Continuous Query Cache
A CQC allows you to create a map that has a continuous query attached to it. Whenever a new entry is added to the map, the query runs and if the entry matches the query, it is added to the query cache. By using a CQC, you do not need to query a map for the same data all the time and all the query results are kept locally.
Queries must be written using the Predicate API. CQC does not support SQL queries. |
You can create a CQC either from a Java client or on a member.
You cannot create a CQC from any client other than Java. |
Accessing a Continuous Query Cache from a Member
The following code snippet shows how you can access a continuous query cache from a member.
QueryCacheConfig queryCacheConfig = new QueryCacheConfig("cache-name");
queryCacheConfig.getPredicateConfig().setImplementation(new OddKeysPredicate());
MapConfig mapConfig = new MapConfig("map-name");
mapConfig.addQueryCacheConfig(queryCacheConfig);
Config config = new Config();
config.addMapConfig(mapConfig);
HazelcastInstance node = Hazelcast.newHazelcastInstance(config);
IMap<Integer, String> map = (IMap) node.getMap("map-name");
Accessing a Continuous Query Cache from a Java Client
The following code snippet shows how you can access a continuous query cache from the client side. The difference in this code from the member side code is that you configure and instantiate a client instance instead of a member instance.
QueryCacheConfig queryCacheConfig = new QueryCacheConfig("cache-name");
queryCacheConfig.getPredicateConfig().setImplementation(new OddKeysPredicate());
ClientConfig clientConfig = new ClientConfig();
clientConfig.addQueryCacheConfig("map-name", queryCacheConfig);
HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
IMap<Integer, Integer> clientMap = (IMap) client.getMap("map-name");
QueryCache<Integer, Integer> cache = clientMap.getQueryCache("cache-name");
Features of Continuous Query Cache
The following features of continuous query cache are valid for both the member and client:
-
The initial query that is run on the existing map entries during the continuous query cache construction can be enabled/disabled according to the supplied predicate via
QueryCacheConfig.setPopulate()
. -
Continuous query cache allows you to run queries with indexes and perform event batching and coalescing.
-
A continuous query cache is evictable. Note that a continuous query cache has a default maximum capacity of 10000. If you need a non-evictable cache, you should configure the eviction via
QueryCacheConfig.setEvictionConfig()
. -
A listener can be added to a continuous query cache using
QueryCache.addEntryListener()
. -
IMap
events are reflected in continuous query cache in the same order as they were generated on map entries. Since events are created on entries stored in partitions, ordering of events is maintained based on the ordering within the partition. You can add listeners to capture lost events usingEventLostListener
and you can recover lost events with the methodQueryCache.tryRecover()
. Recovery of lost events largely depends on the size of the buffer on Hazelcast members. Default buffer size is 16 per partition, i.e., 16 events per partition can be maintained in the buffer. If the event generation is high, setting the buffer size to a higher number provides better chances of recovering lost events. You can set buffer size withQueryCacheConfig.setBufferSize()
. You can use the following example code for a recovery case.QueryCache queryCache = map.getQueryCache("cache-name", Predicates.sql("this > 20"), true); queryCache.addEntryListener(new EventLostListener() { @Override public void eventLost(EventLostEvent event) { queryCache.tryRecover(); } }, false);
-
You can populate a continuous query cache with only the keys of its entries and retrieve the subsequent values directly via
QueryCache.get()
from the underlyingIMap
. This helps to decrease the initial population time when the values are very large.
Configuring a Continuous Query Cache
You can configure a continuous query cache declaratively or programmatically; the
latter is mostly explained in the previous section. The parent configuration element
is <query-caches>
which should be placed within your <map>
configuration. You can
create your query caches using the <query-cache>
sub-element under <query-caches>
.
If you use mutable keys in maps, consider setting the
serialize-keys
option to true
so that Hazelcast clones
the keys rather than using an internal reference. Otherwise,
Hazelcast may store references to outdated versions of your keys.
Setting the serialize-keys option to
true has a negative impact on performance.
|
The following is an example declarative configuration.
<hazelcast>
...
<map>
<query-caches>
<query-cache name="myContQueryCache">
<serialize-keys>false</serialize-keys>
<include-value>true</include-value>
<predicate type="class-name">com.hazelcast.examples.ExamplePredicate</predicate>
<entry-listeners>
<entry-listener>...</entry-listener>
</entry-listeners>
<in-memory-format>BINARY</in-memory-format>
<populate>true</populate>
<coalesce>false</coalesce>
<batch-size>2</batch-size>
<delay-seconds>3</delay-seconds>
<buffer-size>32</buffer-size>
<eviction size="1000" max-size-policy="ENTRY_COUNT" eviction-policy="LFU"/>
<indexes>
<index>
<attributes>
<attribute>age</attribute>
</attributes>
</index>
</indexes>
</query-cache>
</query-caches>
</map>
...
</hazelcast>
hazelcast:
map:
query-caches:
myContQueryCache:
serialize-keys: false
include-value: true
predicate:
class-name: com.hazelcast.examples.ExamplePredicate
entry-listeners:
- class-name: "..."
in-memory-format: BINARY
populate: true
coalesce: false
batch-size: 2
delay-seconds: 3
buffer-size: 32
eviction:
size: 1000
max-size-policy: ENTRY_COUNT
eviction-policy: LFU
indexes:
- attributes:
- "age"
Continuous query caches have the following configuration elements:
-
name
: Name of your continuous query cache. -
serialize-keys
: Whether Hazelcast serializes the query cache keys. Keys should be serialized if they are mutable and need to be cloned via serialization. Default value is false. -
include-value
: Specifies whether the value will be cached too. Its default value is true. -
predicate
: Predicate to filter events which are applied to the query cache. -
entry-listeners
: Adds listeners (listener classes) for your query cache entries. See the Registering Map Listeners section. -
in-memory-format
: Type of the data to be stored in your query cache. See the in-memory format section. Its default value is BINARY. -
populate
: Specifies whether the initial population of your query cache is enabled. Its default value is true. -
coalesce
: Specifies whether the coalescing of your query cache is enabled. Its default value is false. -
delay-seconds
: Minimum time in seconds that an event waits in the member’s buffer. Its default value is 0. -
batch-size
: Batch size used to determine the number of events sent in a batch to your query cache. Its default value is 1. -
buffer-size
: Maximum number of events which can be stored in a partition buffer. Its default value is 16. -
eviction
: Configuration for the eviction of your query cache. See the Configuring Map Eviction section. -
indexes
: Indexes for your query cache defined by using this element’s<index>
sub-elements. See the Configuring IMap Indexes section.
Please take the following configuration considerations and publishing logic into account:
If delay-seconds
is equal to or smaller than 0, then batch-size
loses its
function. Each time there is an event, all the entries in the buffer are pushed to the subscriber.
If delay-seconds
is bigger than 0, the following logic applies:
-
If
coalesce
is set to true, the buffer is checked for an event with the same key; if so, it is overridden by the current event. Then:-
The current size of the buffer is checked: if the current size of the buffer is equal to or larger than
batch-size
, then the events counted as much as thebatch-size
are pushed to the subscriber. Otherwise, no events are sent. -
After finishing with checking
batch-size
, thedelay-seconds
is checked. The buffer is scanned from the oldest to youngest entries; all the entries that are older thandelay-seconds
are pushed to the subscriber.
-