Apply Windowed Aggregation
Aggregation is the cornerstone of distributed stream processing. Most of the useful things we can achieve with stream processing need one or the other form of aggregation.
By definition aggregation takes a finite set of data and produces a result from it. In order to make it work with streaming data, we need some way to break up the stream into finite chunks. This is what windowing does.
In this tutorial, you will use windowed aggregations to monitor a financial exchange’s most actively traded stocks.
Before You Begin
To complete this tutorial, you need the following:
Prerequisites | Useful resources |
---|---|
A Hazelcast cluster running in client/server mode |
Step 1. Create a New Java Project
We’ll assume you’re using an IDE. Create a blank Java project named
trade-monitor
and copy the Gradle or Maven file
into it:
plugins {
id 'com.github.johnrengelman.shadow' version '5.2.0'
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories.mavenCentral()
dependencies {
implementation 'com.hazelcast:hazelcast:5.0.5'
implementation 'com.hazelcast.samples.jet:hazelcast-jet-examples-trade-source:5.0.5'
}
jar {
enabled = false
dependsOn(shadowJar{archiveClassifier.set("")})
manifest.attributes 'Main-Class': 'org.example.TradeMonitor'
}
shadowJar {
dependencies {
exclude(dependency('com.hazelcast:hazelcast:5.0.5'))
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>trade-monitor</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>5.0.5</version>
</dependency>
<dependency>
<groupId>com.hazelcast.samples.jet</groupId>
<artifactId>hazelcast-jet-examples-trade-source</artifactId>
<version>5.0.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.example.TradeMonitor</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.hazelcast:hazelcast</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Step 2. Define the Pipeline
The next thing we need to do is to write the data pipeline, using the Jet API.
We will define a pipeline doing the following:
-
read an unbounded stream of trades
-
compute the number of trades in the past minute, for each stock monitored (every 5 seconds)
-
compute the top 10 stocks with most trades from the previous results (every 5 seconds)
-
format and log the final results (every 5 seconds)
Add following class to your project.
package org.example;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.jet.datamodel.WindowResult;
import com.hazelcast.samples.jet.tradesource.Trade;
import com.hazelcast.samples.jet.tradesource.TradeSource;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamStage;
import java.util.List;
import static com.hazelcast.function.ComparatorEx.comparing;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.aggregate.AggregateOperations.topN;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;
import static com.hazelcast.jet.pipeline.WindowDefinition.tumbling;
import static java.util.concurrent.TimeUnit.SECONDS;
public class TradeMonitor {
private static final int TRADES_PER_SEC = 5000;
private static final long MONITORING_INTERVAL = SECONDS.toMillis(60);
private static final long REPORTING_INTERVAL = SECONDS.toMillis(5);
public static void main(String[] args) {
Pipeline pipeline = definePipeline();
submitForExecution(pipeline);
}
private static Pipeline definePipeline() {
Pipeline pipeline = Pipeline.create();
StreamStage<Trade> source = pipeline.readFrom(TradeSource.tradeStream(TRADES_PER_SEC))
.withNativeTimestamps(0);
StreamStage<KeyedWindowResult<String, Long>> tradeCounts = source
.groupingKey(Trade::getTicker)
.window(sliding(MONITORING_INTERVAL, REPORTING_INTERVAL))
.aggregate(counting());
StreamStage<WindowResult<List<KeyedWindowResult<String, Long>>>> topN = tradeCounts
.window(tumbling(REPORTING_INTERVAL))
.aggregate(topN(10, comparing(KeyedWindowResult::result)));
topN.map(wrList -> format(wrList.result()))
.writeTo(Sinks.logger());
return pipeline;
}
private static String format(List<KeyedWindowResult<String, Long>> results) {
StringBuilder sb = new StringBuilder("Most active stocks in past minute:");
for (int i = 0; i < results.size(); i++) {
KeyedWindowResult<String, Long> result = results.get(i);
sb.append(String.format("\n\t%2d. %5s - %d trades", i + 1, result.getKey(), result.getValue()));
}
return sb.toString();
}
private static void submitForExecution(Pipeline pipeline) {
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
hz.getJet().newJob(pipeline, new JobConfig().setName("trade-monitor"));
}
}
Step 3. Package the Pipeline into a JAR
Now we need to submit this code to Hazelcast for execution. Since Hazelcast runs on our machine as a standalone cluster in a standalone process we need to give it all the code that we have written.
For this reason we create a JAR containing everything we need. All we need to do is to run the build command:
Step 4. Submit the Job for Execution
Assuming our cluster is still running all we need to issue is following command:
bin/hz-cli submit build/libs/trade-monitor-1.0-SNAPSHOT.jar
bin/hz-cli submit target/trade-monitor-1.0-SNAPSHOT.jar
The output you should be seeing in the Hazelcast member’s log is one such message every 5 seconds:
... Most active stocks in past minute:
1. AXDX - 55 trades
2. MTBC - 53 trades
3. ARIS - 52 trades
4. ASUR - 51 trades
5. CSBR - 50 trades
6. ARII - 50 trades
7. FTXD - 50 trades
8. MSDIW - 49 trades
9. SGEN - 49 trades
10. LILAK - 49 trades