Aggregators

This feature has been deprecated. Please use the Fast-Aggregations instead.

Based on the Hazelcast MapReduce framework, Aggregators are ready-to-use data aggregations. These are typical operations like sum up values, finding minimum or maximum values, calculating averages and other operations that you would expect in the relational database world.

Aggregation operations are implemented, as mentioned above, on top of the MapReduce framework. All operations can be achieved using pure MapReduce calls. However, using the Aggregation feature is more convenient for a big set of standard operations.

Aggregations Basics

This section quickly guides you through the basics of the Aggregations framework and some of its available classes. We also implement a first base example in this section.

Aggregations and Map Interfaces

Aggregations are available on both types of map interfaces, com.hazelcast.core.IMap and com.hazelcast .core.MultiMap, using the aggregate methods. Two overloaded methods are available that customize resource management of the underlying MapReduce framework by supplying a custom configured com.hazelcast.mapreduce.JobTracker instance. To find out how to configure the MapReduce framework, see the Configuring JobTracker section. We will later see another way to configure the automatically used MapReduce framework if no special JobTracker is supplied.

Aggregations and the MapReduce Framework

As mentioned before, the Aggregations implementation is based on the Hazelcast MapReduce framework and therefore you might find overlaps in their APIs. One overload of the aggregate method can be supplied with a JobTracker, which is part of the MapReduce framework.

If you implement your own aggregations, you will use a mixture of the Aggregations and the MapReduce API. If you do so, e.g., to make the life of colleagues easier, please read the Implementing Aggregations section.

Using the Aggregations API

We now look into what can be achieved using the Aggregations API. To work on some deeper examples, let’s quickly have a look at the available classes and interfaces and discuss their usage.

Supplier

The com.hazelcast.mapreduce.aggregation.Supplier provides filtering and data extraction to the aggregation operation. This class already provides a few different static methods to achieve the most common cases. Supplier.all() accepts all incoming values and does not apply any data extraction or transformation upon them before supplying them to the aggregation function itself.

For filtering data sets, you have two options by default:

  • You can supply a com.hazelcast.query.Predicate if you want to filter on values and/or keys.

  • Alternatively, you can supply a com.hazelcast.mapreduce.KeyPredicate if you can decide directly on the data key without the need to deserialize the value.

As mentioned above, all APIs are fully Java 8 and Lambda compatible. Let’s have a look on how we can do basic filtering using those two options.

Basic Filtering with KeyPredicate

First, we have a look at a KeyPredicate and only accept people whose last name is "Jones".

Supplier<...> supplier = Supplier.fromKeyPredicate(
    lastName -> "Jones".equalsIgnoreCase( lastName )
);
class JonesKeyPredicate implements KeyPredicate<String> {
  public boolean evaluate( String key ) {
    return "Jones".equalsIgnoreCase( key );
  }
}

Filtering on Values with Predicate

Using the standard Hazelcast Predicate interface, we can also filter based on the value of a data entry. In the following example, you can only select values that are divisible by 4 without a remainder.

Supplier<...> supplier = Supplier.fromPredicate(
    entry -> entry.getValue() % 4 == 0
);
class DivisiblePredicate implements Predicate<String, Integer> {
  public boolean apply( Map.Entry<String, Integer> entry ) {
    return entry.getValue() % 4 == 0;
  }
}

Extracting and Transforming Data

As well as filtering, Supplier can also extract or transform data before providing it to the aggregation operation itself. The following example shows how to transform an input value to a string.

Supplier<String, Integer, String> supplier = Supplier.all(
    value -> Integer.toString(value)
);

You can see a Java 6/7 example in the Aggregations Examples section.

Apart from the fact we transformed the input value of type int (or Integer) to a string, we can see that the generic information of the resulting Supplier has changed as well. This indicates that we now have an aggregation working on string values.

Chaining Multiple Filtering Rules

Another feature of Supplier is its ability to chain multiple filtering rules. Let’s combine all of the above examples into one rule set:

Supplier<String, Integer, String> supplier =
    Supplier.fromKeyPredicate(
        lastName -> "Jones".equalsIgnoreCase( lastName ),
        Supplier.fromPredicate(
            entry -> entry.getValue() % 4 == 0,
            Supplier.all( value -> Integer.toString(value) )
        )
    );

Implementing Supplier with Special Requirements

You might prefer or need to implement your Supplier based on special requirements. This is a very basic task. The Supplier abstract class has just one method: the apply method.

Due to a limitation of the Java Lambda API, you cannot implement abstract classes using Lambdas. Instead it is recommended that you create a standard named class.
class MyCustomSupplier extends Supplier<String, Integer, String> {
  public String apply( Map.Entry<String, Integer> entry ) {
    Integer value = entry.getValue();
    if (value == null) {
      return null;
    }
    return value % 4 == 0 ? String.valueOf( value ) : null;
  }
}

The Supplier apply methods are expected to return null whenever the input value should not be mapped to the aggregation process. This can be used, as in the example above, to implement filter rules directly. Implementing filters using the KeyPredicate and Predicate interfaces might be more convenient.

To use your own Supplier, just pass it to the aggregate method or use it in combination with other Suppliers.

int sum = personAgeMapping.aggregate( new MyCustomSupplier(), Aggregations.count() );
Supplier<String, Integer, String> supplier =
    Supplier.fromKeyPredicate(
        lastName -> "Jones".equalsIgnoreCase( lastName ),
        new MyCustomSupplier()
     );
int sum = personAgeMapping.aggregate( supplier, Aggregations.count() );

Defining the Aggregation Operation

The com.hazelcast.mapreduce.aggregation.Aggregation interface defines the aggregation operation itself. It contains a set of MapReduce API implementations like Mapper, Combiner, Reducer and Collator. These implementations are normally unique to the chosen Aggregation. This interface can also be implemented with your aggregation operations based on MapReduce calls. See the Implementing Aggregations section for more information.

The com.hazelcast.mapreduce.aggregation.Aggregations class provides a common predefined set of aggregations. This class contains type-safe aggregations of the following types:

  • Average (Integer, Long, Double, BigInteger, BigDecimal)

  • Sum (Integer, Long, Double, BigInteger, BigDecimal)

  • Min (Integer, Long, Double, BigInteger, BigDecimal, Comparable)

  • Max (Integer, Long, Double, BigInteger, BigDecimal, Comparable)

  • DistinctValues

  • Count

Those aggregations are similar to their counterparts on relational databases and can be equated to SQL statements as set out below.

Average:

Calculates an average value based on all selected values.

map.aggregate( Supplier.all( person -> person.getAge() ),
               Aggregations.integerAvg() );
SELECT AVG(person.age) FROM person;

Sum:

Calculates a sum based on all selected values.

map.aggregate( Supplier.all( person -> person.getAge() ),
               Aggregations.integerSum() );
SELECT SUM(person.age) FROM person;

Minimum (Min):

Finds the minimal value over all selected values.

map.aggregate( Supplier.all( person -> person.getAge() ),
               Aggregations.integerMin() );
SELECT MIN(person.age) FROM person;

Maximum (Max):

Finds the maximal value over all selected values.

map.aggregate( Supplier.all( person -> person.getAge() ),
               Aggregations.integerMax() );
SELECT MAX(person.age) FROM person;

Distinct Values:

Returns a collection of distinct values over the selected values

map.aggregate( Supplier.all( person -> person.getAge() ),
               Aggregations.distinctValues() );
SELECT DISTINCT person.age FROM person;

Count:

Returns the element count over all selected values

map.aggregate( Supplier.all(), Aggregations.count() );
SELECT COUNT(*) FROM person;

Extracting Attribute Values with PropertyExtractor

We used the com.hazelcast.mapreduce.aggregation.PropertyExtractor interface before when we had a look at the example on how to use a Supplier to transform a value to another type. It can also be used to extract attributes from values.

class Person {
  private String firstName;
  private String lastName;
  private int age;

  // getters and setters
}

PropertyExtractor<Person, Integer> propertyExtractor = (person) -> person.getAge();
class AgeExtractor implements PropertyExtractor<Person, Integer> {
  public Integer extract( Person value ) {
    return value.getAge();
  }
}

In this example, we extract the value from the person’s age attribute. The value type changes from Person to Integer which is reflected in the generics information to stay type-safe.

You can use PropertyExtractors for any kind of transformation of data. You might even want to have multiple transformation steps chained one after another.

Configuring Aggregations

As stated before, the easiest way to configure the resources used by the underlying MapReduce framework is to supply a JobTracker to the aggregation call itself by passing it to either IMap.aggregate() or MultiMap.aggregate().

There is another way to implicitly configure the underlying used JobTracker. If no specific JobTracker was passed for the aggregation call, internally one is created using the following naming specifications:

For IMap aggregation calls, the naming specification is created as hz::aggregation-map- and the concatenated name of the map. For MultiMap it is very similar, i.e., hz::aggregation-multimap- and the concatenated name of the MultiMap.

Knowing the specification of the name, we can configure the JobTracker as expected (as described in Retrieving a JobTracker Instance) using the naming spec we just learned. For more information on the configuration of JobTracker, see the Configuring Jobtracker section.

To finish this section, let’s have a quick example for the above naming specs:

IMap<String, Integer> map = hazelcastInstance.getMap( "mymap" );

// The internal JobTracker name resolves to 'hz::aggregation-map-mymap'
map.aggregate( ... );
MultiMap<String, Integer> multimap = hazelcastInstance.getMultiMap( "mymultimap" );

// The internal JobTracker name resolves to 'hz::aggregation-multimap-mymultimap'
multimap.aggregate( ... );

Aggregations Examples

For the final example, imagine you are working for an international company and you have an employee database stored in Hazelcast IMap with all employees worldwide and a MultiMap for assigning employees to their certain locations or offices. In addition, there is another IMap that holds the salary per employee.

Setting up the Data Model

Let’s have a look at our data model.

class Employee implements Serializable {
    private String firstName;
    private String lastName;
    private String companyName;
    private String address;
    private String city;
    private String county;
    private String state;
    private int zip;
    private String phone1;
    private String phone2;
    private String email;
    private String web;

    // getters and setters
}

class SalaryMonth implements Serializable {
    private Month month;
    private int salary;

    // getters and setters
}

class SalaryYear implements Serializable {
    private String email;
    private int year;
    private List<SalaryMonth> months;

    // getters and setters

    public int getAnnualSalary() {
        int sum = 0;
        for ( SalaryMonth salaryMonth : getMonths() ) {
            sum += salaryMonth.getSalary();
        }
        return sum;
    }
}

The two IMaps and the MultiMap are keyed by the string of email. They are defined as follows:

IMap<String, Employee> employees = hz.getMap( "employees" );
IMap<String, SalaryYear> salaries = hz.getMap( "salaries" );
MultiMap<String, String> officeAssignment = hz.getMultiMap( "office-employee" );

So far, we know all the important information to work out some example aggregations. We will look into some deeper implementation details and how we can work around some current limitations that will be eliminated in future versions of the API.

Average Aggregation Example

Let’s start with a very basic example. We want to know the average salary of all of our employees. To do this, we need a PropertyExtractor and the average aggregation for type Integer.

IMap<String, SalaryYear> salaries = hazelcastInstance.getMap( "salaries" );
PropertyExtractor<SalaryYear, Integer> extractor =
    (salaryYear) -> salaryYear.getAnnualSalary();
int avgSalary = salaries.aggregate( Supplier.all( extractor ),
                                    Aggregations.integerAvg() );

That’s it. Internally, we created a MapReduce task based on the predefined aggregation and fired it up immediately. Currently all aggregation calls are blocking operations, so it is not yet possible to execute the aggregation in a reactive way (using com.hazelcast.core.ICompletableFuture), but this will be part of an upcoming version.

Map Join Example

The following example is a little more complex. We only want to have our US-based employees selected into the average salary calculation, so we need to execute a join operation between the employees and salaries maps.

class USEmployeeFilter implements KeyPredicate<String>, HazelcastInstanceAware {
  private transient HazelcastInstance hazelcastInstance;

  public void setHazelcastInstance( HazelcastInstance hazelcastInstance ) {
    this.hazelcastInstance = hazelcastInstance;
  }

  public boolean evaluate( String email ) {
    IMap<String, Employee> employees = hazelcastInstance.getMap( "employees" );
    Employee employee = employees.get( email );
    return "US".equals( employee.getCountry() );
  }
}

Using the HazelcastInstanceAware interface, we get the current instance of Hazelcast injected into our filter and we can perform data joins on other data structures of the cluster. We now only select employees that work as part of our US offices into the aggregation.

IMap<String, SalaryYear> salaries = hazelcastInstance.getMap( "salaries" );
PropertyExtractor<SalaryYear, Integer> extractor =
    (salaryYear) -> salaryYear.getAnnualSalary();
int avgSalary = salaries.aggregate( Supplier.fromKeyPredicate(
                                        new USEmployeeFilter(), extractor
                                    ), Aggregations.integerAvg() );

Grouping Example

For our next example, we do some grouping based on the different worldwide offices. Currently, a group aggregator is not yet available, so we need a small workaround to achieve this goal. (In later versions of the Aggregations API this will not be required because it will be available out-of-the-box in a much more convenient way.)

Again, let’s start with our filter. This time, we want to filter based on an office name and we need to do some data joins to achieve this kind of filtering.

A short tip: to minimize the data transmission on the aggregation we can use Data Affinity rules to influence the partitioning of data. Be aware that this is an expert feature of Hazelcast.

class OfficeEmployeeFilter implements KeyPredicate<String>, HazelcastInstanceAware {
    private transient HazelcastInstance hazelcastInstance;
    private String office;

    // Deserialization Constructor
    public OfficeEmployeeFilter() {
    }

    public OfficeEmployeeFilter( String office ) {
        this.office = office;
    }

    public void setHazelcastInstance( HazelcastInstance hazelcastInstance ) {
        this.hazelcastInstance = hazelcastInstance;
    }

    public boolean evaluate( String email ) {
        MultiMap<String, String> officeAssignment = hazelcastInstance
                                    .getMultiMap( "office-employee" );

        return officeAssignment.containsEntry( office, email );
    }
}

Now we can execute our aggregations. As mentioned before, we currently need to do the grouping on our own by executing multiple aggregations in a row.

Map<String, Integer> avgSalariesPerOffice = new HashMap<String, Integer>();

IMap<String, SalaryYear> salaries = hazelcastInstance.getMap( "salaries" );
MultiMap<String, String> officeAssignment =
    hazelcastInstance.getMultiMap( "office-employee" );

PropertyExtractor<SalaryYear, Integer> extractor =
    (salaryYear) -> salaryYear.getAnnualSalary();

for ( String office : officeAssignment.keySet() ) {
  OfficeEmployeeFilter filter = new OfficeEmployeeFilter( office );
  int avgSalary = salaries.aggregate( Supplier.fromKeyPredicate( filter, extractor ),
                                      Aggregations.integerAvg() );

  avgSalariesPerOffice.put( office, avgSalary );
}

Simple Count Example

We want to end this section by executing one final and easy aggregation. We want to know how many employees we currently have on a worldwide basis. Before reading the next lines of example code, you can try to do it on your own to see if you understood how to execute aggregations.

IMap<String, Employee> employees = hazelcastInstance.getMap( "employees" );
int count = employees.size();

After the quick joke of the previous two code lines, we look at the real two code lines:

IMap<String, Employee> employees = hazelcastInstance.getMap( "employees" );
int count = employees.aggregate( Supplier.all(), Aggregations.count() );

We now have an overview of how to use aggregations in real life situations. If you want to do your colleagues a favor, you might want to write your own additional set of aggregations. If so, then read the next section, Implementing Aggregations.

Implementing Aggregations

This section explains how to implement your own aggregations in your own application. It is an advanced section, so if you do not intend to implement your own aggregation, you might want to stop reading here and come back later when you need to know how to implement your own aggregation.

An Aggregation implementation is defining a MapReduce task, but with a small difference: the Mapper is always expected to work on a Supplier that filters and/or transforms the mapped input value to some output value.

Aggregation Methods

The main interface for making your own aggregation is com.hazelcast.mapreduce.aggregation.Aggregation. It consists of four methods.

interface Aggregation<Key, Supplied, Result> {
    Mapper getMapper(Supplier<Key, ?, Supplied> supplier);
    CombinerFactory getCombinerFactory();
    ReducerFactory getReducerFactory();
    Collator<Map.Entry, Result> getCollator();
}

The getMapper and getReducerFactory methods should return non-null values. getCombinerFactory and getCollator are optional operations and you do not need to implement them. You can decide to implement them depending on the use case you want to achieve.