A newer version of Hazelcast Platform is available.

View latest

Aggregations

Aggregations allow you to compute a value of some function (e.g sum or max) over the stored map entries. The computation is performed in a fully distributed manner, so no data other than the computed function value is transferred to a caller, making the computation fast.

If the in-memory format of your data is NATIVE, aggregations always run on the partition threads. If the data is of type BINARY or OBJECT, they also mostly run on the partition threads, however, they may run on the separate query threads to avoid blocking partition threads (if there are no ongoing migrations).

Aggregator API

The aggregation is split into three phases represented by three methods:

  1. accumulate()

  2. combine()

  3. aggregate()

There are also the following callbacks:

  • onAccumulationFinished() called when the accumulation phase finishes

  • onCombinationFinished() called when the combination phase finishes

These callbacks enable releasing the state that might have been initialized and stored in the Aggregator - to reduce the network traffic.

Each phase is described below. See also the Aggregator Javadoc for the API’s details.

Accumulation:

During the accumulation phase each Aggregator accumulates all entries passed to it by the query engine. It accumulates only those pieces of information that are required to calculate the aggregation result in the last phase - that’s implementation specific.

In case of the DoubleAverage aggregation the Aggregator would accumulate:

  • the sum of the elements it accumulated

  • the count of the elements it accumulated

Combination:

Since aggregation is executed in parallel on each partition of the cluster, the results need to be combined after the accumulation phase in order to be able to calculate the final result.

In case of the DoubleAverage aggregation, the aggregator would sum up all the sums of the elements and all the counts.

Aggregation:

Aggregation is the last phase that calculates the final result from the results accumulated and combined in the preceding phases.

In case of the DoubleAverage aggregation, the Aggregator would just divide the sum of the elements by their count (if non-zero).

Aggregations and Map Interfaces

Aggregations are available on com.hazelcast.map.IMap only. IMap offers the method aggregate to apply the aggregation logic on the map entries. This method can be called with or without a predicate. You can refer to its Javadoc to see the method details.

Example Implementation

Here’s an example implementation of the Aggregator:

    private static void simpleCustomAverageAggregation(IMap<String, FAEmployee> employees) {
        System.out.println("Calculating salary average");

        double avgSalary = employees.aggregate(new Aggregator<Map.Entry<String, FAEmployee>, Double>() {

            protected long sum;
            protected long count;

            @Override
            public void accumulate(Map.Entry<String, FAEmployee> entry) {
                count++;
                sum += entry.getValue().getSalaryPerMonth();
            }

            @Override
            public void combine(Aggregator aggregator) {

                this.sum += this.getClass().cast(aggregator).sum;
                this.count += this.getClass().cast(aggregator).count;
            }

            @Override
            public Double aggregate() {
                if (count == 0) {
                    return null;
                }
                return ((double) sum / (double) count);
            }

        });

        System.out.println("Overall average salary: " + avgSalary);
        System.out.println("\n");
    }

As you can see:

  • the accumulate() method calculates the sum and count of the elements

  • the combine() method combines the results from all the accumulations

  • the aggregate() method calculates the final result.

Built-In Aggregations

The com.hazelcast.aggregation.Aggregators class provides a wide variety of built-in Aggregators. The full list is presented below:

  • count

  • distinct

  • bigDecimal sum/avg/min/max

  • bigInteger sum/avg/min/max

  • double sum/avg/min/max

  • integer sum/avg/min/max

  • long sum/avg/min/max

  • number avg

  • comparable min/max

  • fixedPointSum, floatingPointSum

To use the any of these Aggregators, instantiate them using the Aggregators factory class.

Each built-in Aggregator can also navigate to an attribute of the object passed to the accumulate() method (via reflection). For example, Aggregators.distinct("address.city") extracts the address.city attribute from the object passed to the Aggregator and accumulate the extracted value.

Configuration Options

On each partition, after the entries have been passed to the aggregator, the accumulation runs in parallel. It means that each aggregator is cloned and receives a sub-set of the entries received from a partition. Then, it runs the accumulation phase in all the cloned aggregators - at the end, the result is combined into a single accumulation result. It speeds up the processing by at least the factor of two - even in case of simple aggregations. If the accumulation logic is more "heavy", the speed-up may be more significant.

In order to switch the accumulation into a sequential mode just set the hazelcast.aggregation.accumulation.parallel.evaluation property to false (it’s set to true by default).