Fast-Aggregations
Fast-Aggregations functionality is the successor of the Aggregators. They are equivalent to the MapReduce Aggregators in most of the use cases, but instead of running on the MapReduce engine they run on the Query infrastructure. Their performance is tens to hundreds times better since they run in parallel for each partition and are highly optimized for speed and low memory consumption.
If the in-memory format of your data is NATIVE ,
aggregations always run on the partition threads. If the data is of type BINARY
or OBJECT , they also mostly run on the partition threads, however, they may run on
the separate query threads to avoid blocking partition threads (if there are no ongoing migrations).
|
Aggregator API
The Fast-Aggregation consists of three phases represented by three methods:
-
accumulate()
-
combine()
-
aggregate()
There are also the following callbacks:
-
onAccumulationFinished()
called when the accumulation phase finishes -
onCombinationFinished()
called when the combination phase finishes
These callbacks enable releasing the state that might have been initialized and stored in the Aggregator - to reduce the network traffic.
Each phase is described below. See also the Aggregator Javadoc for the API’s details.
Accumulation:
During the accumulation phase each Aggregator accumulates all entries passed to it by the query engine. It accumulates only those pieces of information that are required to calculate the aggregation result in the last phase - that’s implementation specific.
In case of the DoubleAverage
aggregation the Aggregator would accumulate:
-
the sum of the elements it accumulated
-
the count of the elements it accumulated
Combination:
Since aggregation is executed in parallel on each partition of the cluster, the results need to be combined after the accumulation phase in order to be able to calculate the final result.
In case of the DoubleAverage
aggregation, the aggregator would sum up all
the sums of the elements and all the counts.
Aggregation:
Aggregation is the last phase that calculates the final result from the results accumulated and combined in the preceding phases.
In case of the DoubleAverage
aggregation, the Aggregator would just
divide the sum of the elements by their count (if non-zero).
Fast-Aggregations and Map Interfaces
Aggregations are available on com.hazelcast.map.IMap
only.
IMap offers the method aggregate
to apply the aggregation logic on
the map entries. This method can be called with or without a predicate. You can refer
to its Javadoc
to see the method details.
Example Implementation
Here’s an example implementation of the Aggregator:
private static void simpleCustomAverageAggregation(IMap<String, FAEmployee> employees) {
System.out.println("Calculating salary average");
double avgSalary = employees.aggregate(new Aggregator<Map.Entry<String, FAEmployee>, Double>() {
protected long sum;
protected long count;
@Override
public void accumulate(Map.Entry<String, FAEmployee> entry) {
count++;
sum += entry.getValue().getSalaryPerMonth();
}
@Override
public void combine(Aggregator aggregator) {
this.sum += this.getClass().cast(aggregator).sum;
this.count += this.getClass().cast(aggregator).count;
}
@Override
public Double aggregate() {
if (count == 0) {
return null;
}
return ((double) sum / (double) count);
}
});
System.out.println("Overall average salary: " + avgSalary);
System.out.println("\n");
}
As you can see:
-
the
accumulate()
method calculates the sum and count of the elements -
the
combine()
method combines the results from all the accumulations -
the
aggregate()
method calculates the final result.
Built-In Aggregations
The com.hazelcast.aggregation.Aggregators
class provides a wide variety of built-in Aggregators.
The full list is presented below:
-
count
-
distinct
-
bigDecimal
sum
/avg
/min
/max
-
bigInteger
sum
/avg
/min
/max
-
double
sum
/avg
/min
/max
-
integer
sum
/avg
/min
/max
-
long
sum
/avg
/min
/max
-
number
avg
-
comparable
min
/max
-
fixedPointSum
,floatingPointSum
To use the any of these Aggregators, instantiate them using the Aggregators
factory class.
Each built-in Aggregator can also navigate to an attribute of the object passed
to the accumulate()
method (via reflection). For example, Aggregators.distinct("address.city")
extracts the address.city
attribute from the object passed to the Aggregator and
accumulate the extracted value.
Configuration Options
On each partition, after the entries have been passed to the aggregator, the accumulation runs in parallel. It means that each aggregator is cloned and receives a sub-set of the entries received from a partition. Then, it runs the accumulation phase in all of the cloned aggregators - at the end, the result is combined into a single accumulation result. It speeds up the processing by at least the factor of two - even in case of simple aggregations. If the accumulation logic is more "heavy", the speed-up may be more significant.
In order to switch the accumulation into a sequential mode just set the
hazelcast.aggregation.accumulation.parallel.evaluation
property to false
(it’s set to true
by default).