Aggregators
This feature has been deprecated. Please use the Fast-Aggregations instead. |
Based on the Hazelcast MapReduce framework, Aggregators are ready-to-use data aggregations. These are typical operations like sum up values, finding minimum or maximum values, calculating averages and other operations that you would expect in the relational database world.
Aggregation operations are implemented, as mentioned above, on top of the MapReduce framework. All operations can be achieved using pure MapReduce calls. However, using the Aggregation feature is more convenient for a big set of standard operations.
Aggregations Basics
This section quickly guides you through the basics of the Aggregations framework and some of its available classes. We also implement a first base example in this section.
Aggregations and Map Interfaces
Aggregations are available on both types of map interfaces,
com.hazelcast.core.IMap
and com.hazelcast
.core.MultiMap
, using
the aggregate
methods. Two overloaded methods are available that
customize resource management of the
underlying MapReduce framework by supplying a custom configured
com.hazelcast.mapreduce.JobTracker
instance. To find out how to
configure the MapReduce framework, see the Configuring JobTracker section.
We will
later see another way to configure the automatically used MapReduce framework
if no special JobTracker
is supplied.
Aggregations and the MapReduce Framework
As mentioned before, the Aggregations implementation is based on the Hazelcast
MapReduce framework and therefore you might find
overlaps in their APIs. One overload of the aggregate
method can be
supplied with
a JobTracker
, which is part of the MapReduce framework.
If you implement your own aggregations, you will use a mixture of the Aggregations and the MapReduce API. If you do so, e.g., to make the life of colleagues easier, please read the Implementing Aggregations section.
Using the Aggregations API
We now look into what can be achieved using the Aggregations API. To work on some deeper examples, let’s quickly have a look at the available classes and interfaces and discuss their usage.
Supplier
The com.hazelcast.mapreduce.aggregation.Supplier
provides filtering and
data extraction to the aggregation operation.
This class already provides a few different static methods to achieve the
most common cases. Supplier.all()
accepts all incoming values and does not apply any data extraction or
transformation upon them before supplying them to
the aggregation function itself.
For filtering data sets, you have two options by default:
-
You can supply a
com.hazelcast.query.Predicate
if you want to filter on values and/or keys. -
Alternatively, you can supply a
com.hazelcast.mapreduce.KeyPredicate
if you can decide directly on the data key without the need to deserialize the value.
As mentioned above, all APIs are fully Java 8 and Lambda compatible. Let’s have a look on how we can do basic filtering using those two options.
Basic Filtering with KeyPredicate
First, we have a look at a KeyPredicate
and only accept people whose last
name is "Jones".
Supplier<...> supplier = Supplier.fromKeyPredicate(
lastName -> "Jones".equalsIgnoreCase( lastName )
);
class JonesKeyPredicate implements KeyPredicate<String> {
public boolean evaluate( String key ) {
return "Jones".equalsIgnoreCase( key );
}
}
Filtering on Values with Predicate
Using the standard Hazelcast Predicate
interface, we can also filter
based on the value of a data entry. In the following example, you can
only select values that are divisible by 4 without a remainder.
Supplier<...> supplier = Supplier.fromPredicate(
entry -> entry.getValue() % 4 == 0
);
class DivisiblePredicate implements Predicate<String, Integer> {
public boolean apply( Map.Entry<String, Integer> entry ) {
return entry.getValue() % 4 == 0;
}
}
Extracting and Transforming Data
As well as filtering, Supplier
can also extract or transform data
before providing it
to the aggregation operation itself. The following example shows how
to transform an input value to a string.
Supplier<String, Integer, String> supplier = Supplier.all(
value -> Integer.toString(value)
);
You can see a Java 6/7 example in the Aggregations Examples section.
Apart from the fact we transformed the input value of type int
(or Integer
)
to a string, we can see that the generic information
of the resulting Supplier
has changed as well. This indicates that we now
have an aggregation working on string values.
Chaining Multiple Filtering Rules
Another feature of Supplier
is its ability to chain multiple filtering rules.
Let’s combine all of the
above examples into one rule set:
Supplier<String, Integer, String> supplier =
Supplier.fromKeyPredicate(
lastName -> "Jones".equalsIgnoreCase( lastName ),
Supplier.fromPredicate(
entry -> entry.getValue() % 4 == 0,
Supplier.all( value -> Integer.toString(value) )
)
);
Implementing Supplier with Special Requirements
You might prefer or need to implement your Supplier
based on special
requirements. This is a very basic task. The Supplier
abstract class has
just one method: the apply
method.
Due to a limitation of the Java Lambda API, you cannot implement abstract classes using Lambdas. Instead it is recommended that you create a standard named class. |
class MyCustomSupplier extends Supplier<String, Integer, String> {
public String apply( Map.Entry<String, Integer> entry ) {
Integer value = entry.getValue();
if (value == null) {
return null;
}
return value % 4 == 0 ? String.valueOf( value ) : null;
}
}
The Supplier
apply
methods are expected to return null whenever the
input value should not be mapped to the aggregation
process. This can be used, as in the example above, to implement filter
rules directly. Implementing filters using the
KeyPredicate
and Predicate
interfaces might be more convenient.
To use your own Supplier
, just pass it to the aggregate method or use
it in combination with other Supplier
s.
int sum = personAgeMapping.aggregate( new MyCustomSupplier(), Aggregations.count() );
Supplier<String, Integer, String> supplier =
Supplier.fromKeyPredicate(
lastName -> "Jones".equalsIgnoreCase( lastName ),
new MyCustomSupplier()
);
int sum = personAgeMapping.aggregate( supplier, Aggregations.count() );
Defining the Aggregation Operation
The com.hazelcast.mapreduce.aggregation.Aggregation
interface defines
the aggregation operation itself. It contains a set of
MapReduce API implementations like Mapper
, Combiner
, Reducer
and
Collator
. These implementations are normally unique to
the chosen Aggregation
. This interface can also be implemented with
your aggregation operations based on MapReduce calls. See the
Implementing Aggregations section for more
information.
The com.hazelcast.mapreduce.aggregation.Aggregations
class provides
a common predefined set of aggregations. This class
contains type-safe aggregations of the following types:
-
Average (Integer, Long, Double, BigInteger, BigDecimal)
-
Sum (Integer, Long, Double, BigInteger, BigDecimal)
-
Min (Integer, Long, Double, BigInteger, BigDecimal, Comparable)
-
Max (Integer, Long, Double, BigInteger, BigDecimal, Comparable)
-
DistinctValues
-
Count
Those aggregations are similar to their counterparts on relational databases and can be equated to SQL statements as set out below.
Average:
Calculates an average value based on all selected values.
map.aggregate( Supplier.all( person -> person.getAge() ),
Aggregations.integerAvg() );
SELECT AVG(person.age) FROM person;
Sum:
Calculates a sum based on all selected values.
map.aggregate( Supplier.all( person -> person.getAge() ),
Aggregations.integerSum() );
SELECT SUM(person.age) FROM person;
Minimum (Min):
Finds the minimal value over all selected values.
map.aggregate( Supplier.all( person -> person.getAge() ),
Aggregations.integerMin() );
SELECT MIN(person.age) FROM person;
Maximum (Max):
Finds the maximal value over all selected values.
map.aggregate( Supplier.all( person -> person.getAge() ),
Aggregations.integerMax() );
SELECT MAX(person.age) FROM person;
Distinct Values:
Returns a collection of distinct values over the selected values
map.aggregate( Supplier.all( person -> person.getAge() ),
Aggregations.distinctValues() );
SELECT DISTINCT person.age FROM person;
Count:
Returns the element count over all selected values
map.aggregate( Supplier.all(), Aggregations.count() );
SELECT COUNT(*) FROM person;
Extracting Attribute Values with PropertyExtractor
We used the com.hazelcast.mapreduce.aggregation.PropertyExtractor
interface
before when we had a look at the example
on how to use a Supplier
to
transform a value to another type. It can
also be used to extract attributes from values.
class Person {
private String firstName;
private String lastName;
private int age;
// getters and setters
}
PropertyExtractor<Person, Integer> propertyExtractor = (person) -> person.getAge();
class AgeExtractor implements PropertyExtractor<Person, Integer> {
public Integer extract( Person value ) {
return value.getAge();
}
}
In this example, we extract the value from the person’s age attribute. The value
type changes from Person to Integer
which is reflected in the generics information
to stay type-safe.
You can use PropertyExtractor
s for any kind of transformation of data. You
might even want to have multiple
transformation steps chained one after another.
Configuring Aggregations
As stated before, the easiest way to configure the resources used by the underlying
MapReduce framework is to supply a JobTracker
to the aggregation call itself by passing it to either IMap.aggregate()
or
MultiMap.aggregate()
.
There is another way to implicitly configure the underlying used JobTracker
.
If no specific JobTracker
was
passed for the aggregation call, internally one is created using the following
naming specifications:
For IMap
aggregation calls, the naming specification is created as hz::aggregation-map-
and the concatenated name of the map. For MultiMap
it is very similar, i.e.,
hz::aggregation-multimap-
and the concatenated name of the MultiMap.
Knowing the specification of the name, we can configure the JobTracker
as expected
(as described in Retrieving a JobTracker Instance)
using the naming spec we just learned. For more information on the configuration of
JobTracker
, see the Configuring Jobtracker section.
To finish this section, let’s have a quick example for the above naming specs:
IMap<String, Integer> map = hazelcastInstance.getMap( "mymap" );
// The internal JobTracker name resolves to 'hz::aggregation-map-mymap'
map.aggregate( ... );
MultiMap<String, Integer> multimap = hazelcastInstance.getMultiMap( "mymultimap" );
// The internal JobTracker name resolves to 'hz::aggregation-multimap-mymultimap'
multimap.aggregate( ... );
Aggregations Examples
For the final example, imagine you are working for an international company and you
have an employee database stored in Hazelcast
IMap
with all employees worldwide and a MultiMap
for assigning employees to
their certain locations or offices. In addition,
there is another IMap
that holds the salary per employee.
Setting up the Data Model
Let’s have a look at our data model.
class Employee implements Serializable {
private String firstName;
private String lastName;
private String companyName;
private String address;
private String city;
private String county;
private String state;
private int zip;
private String phone1;
private String phone2;
private String email;
private String web;
// getters and setters
}
class SalaryMonth implements Serializable {
private Month month;
private int salary;
// getters and setters
}
class SalaryYear implements Serializable {
private String email;
private int year;
private List<SalaryMonth> months;
// getters and setters
public int getAnnualSalary() {
int sum = 0;
for ( SalaryMonth salaryMonth : getMonths() ) {
sum += salaryMonth.getSalary();
}
return sum;
}
}
The two IMap
s and the MultiMap
are keyed by the string of email. They are
defined as follows:
IMap<String, Employee> employees = hz.getMap( "employees" );
IMap<String, SalaryYear> salaries = hz.getMap( "salaries" );
MultiMap<String, String> officeAssignment = hz.getMultiMap( "office-employee" );
So far, we know all the important information to work out some example aggregations. We will look into some deeper implementation details and how we can work around some current limitations that will be eliminated in future versions of the API.
Average Aggregation Example
Let’s start with a very basic example. We want to know the average salary of all of
our employees. To do this,
we need a PropertyExtractor
and the average aggregation for type Integer
.
IMap<String, SalaryYear> salaries = hazelcastInstance.getMap( "salaries" );
PropertyExtractor<SalaryYear, Integer> extractor =
(salaryYear) -> salaryYear.getAnnualSalary();
int avgSalary = salaries.aggregate( Supplier.all( extractor ),
Aggregations.integerAvg() );
That’s it. Internally, we created a MapReduce task based on the predefined
aggregation and fired it up immediately. Currently all
aggregation calls are blocking operations, so it is not yet possible to execute
the aggregation in a reactive way (using
com.hazelcast.core.ICompletableFuture
), but this will be part of an upcoming
version.
Map Join Example
The following example is a little more complex. We only want to have our US-based employees selected into the average salary calculation, so we need to execute a join operation between the employees and salaries maps.
class USEmployeeFilter implements KeyPredicate<String>, HazelcastInstanceAware {
private transient HazelcastInstance hazelcastInstance;
public void setHazelcastInstance( HazelcastInstance hazelcastInstance ) {
this.hazelcastInstance = hazelcastInstance;
}
public boolean evaluate( String email ) {
IMap<String, Employee> employees = hazelcastInstance.getMap( "employees" );
Employee employee = employees.get( email );
return "US".equals( employee.getCountry() );
}
}
Using the HazelcastInstanceAware
interface, we get the current instance of
Hazelcast injected into our filter and we can perform data
joins on other data structures of the cluster. We now only select employees
that work as part of our US offices into the
aggregation.
IMap<String, SalaryYear> salaries = hazelcastInstance.getMap( "salaries" );
PropertyExtractor<SalaryYear, Integer> extractor =
(salaryYear) -> salaryYear.getAnnualSalary();
int avgSalary = salaries.aggregate( Supplier.fromKeyPredicate(
new USEmployeeFilter(), extractor
), Aggregations.integerAvg() );
Grouping Example
For our next example, we do some grouping based on the different worldwide offices. Currently, a group aggregator is not yet available, so we need a small workaround to achieve this goal. (In later versions of the Aggregations API this will not be required because it will be available out-of-the-box in a much more convenient way.)
Again, let’s start with our filter. This time, we want to filter based on an office name and we need to do some data joins to achieve this kind of filtering.
A short tip: to minimize the data transmission on the aggregation we can use Data Affinity rules to influence the partitioning of data. Be aware that this is an expert feature of Hazelcast.
class OfficeEmployeeFilter implements KeyPredicate<String>, HazelcastInstanceAware {
private transient HazelcastInstance hazelcastInstance;
private String office;
// Deserialization Constructor
public OfficeEmployeeFilter() {
}
public OfficeEmployeeFilter( String office ) {
this.office = office;
}
public void setHazelcastInstance( HazelcastInstance hazelcastInstance ) {
this.hazelcastInstance = hazelcastInstance;
}
public boolean evaluate( String email ) {
MultiMap<String, String> officeAssignment = hazelcastInstance
.getMultiMap( "office-employee" );
return officeAssignment.containsEntry( office, email );
}
}
Now we can execute our aggregations. As mentioned before, we currently need to do the grouping on our own by executing multiple aggregations in a row.
Map<String, Integer> avgSalariesPerOffice = new HashMap<String, Integer>();
IMap<String, SalaryYear> salaries = hazelcastInstance.getMap( "salaries" );
MultiMap<String, String> officeAssignment =
hazelcastInstance.getMultiMap( "office-employee" );
PropertyExtractor<SalaryYear, Integer> extractor =
(salaryYear) -> salaryYear.getAnnualSalary();
for ( String office : officeAssignment.keySet() ) {
OfficeEmployeeFilter filter = new OfficeEmployeeFilter( office );
int avgSalary = salaries.aggregate( Supplier.fromKeyPredicate( filter, extractor ),
Aggregations.integerAvg() );
avgSalariesPerOffice.put( office, avgSalary );
}
Simple Count Example
We want to end this section by executing one final and easy aggregation. We want to know how many employees we currently have on a worldwide basis. Before reading the next lines of example code, you can try to do it on your own to see if you understood how to execute aggregations.
IMap<String, Employee> employees = hazelcastInstance.getMap( "employees" );
int count = employees.size();
After the quick joke of the previous two code lines, we look at the real two code lines:
IMap<String, Employee> employees = hazelcastInstance.getMap( "employees" );
int count = employees.aggregate( Supplier.all(), Aggregations.count() );
We now have an overview of how to use aggregations in real life situations. If you want to do your colleagues a favor, you might want to write your own additional set of aggregations. If so, then read the next section, Implementing Aggregations.
Implementing Aggregations
This section explains how to implement your own aggregations in your own application. It is an advanced section, so if you do not intend to implement your own aggregation, you might want to stop reading here and come back later when you need to know how to implement your own aggregation.
An Aggregation
implementation is defining a MapReduce task, but with a small
difference: the Mapper
is always expected to work on a Supplier
that filters and/or transforms the mapped
input value to some output value.
Aggregation Methods
The main interface for making your own aggregation is com.hazelcast.mapreduce.aggregation.Aggregation
.
It consists of four
methods.
interface Aggregation<Key, Supplied, Result> {
Mapper getMapper(Supplier<Key, ?, Supplied> supplier);
CombinerFactory getCombinerFactory();
ReducerFactory getReducerFactory();
Collator<Map.Entry, Result> getCollator();
}
The getMapper
and getReducerFactory
methods should return non-null values.
getCombinerFactory
and getCollator
are
optional operations and you do not need to implement them. You can decide to
implement them depending on the use case you want
to achieve.