A newer version of Platform is available.

View latest

Apply Windowed Aggregation

Aggregation is the cornerstone of distributed stream processing. Most of the useful things we can achieve with stream processing need one or the other form of aggregation.

By definition aggregation takes a finite set of data and produces a result from it. In order to make it work with streaming data, we need some way to break up the stream into finite chunks. This is what windowing does.

In this tutorial, you will use windowed aggregations to monitor a financial exchange’s most actively traded stocks.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

A Hazelcast cluster running in client/server mode

Step 1. Create a New Java Project

We’ll assume you’re using an IDE. Create a blank Java project named trade-monitor and copy the Gradle or Maven file into it:

  • Gradle

  • Maven

plugins {
    id 'com.github.johnrengelman.shadow' version '5.2.0'
    id 'java'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories.mavenCentral()

dependencies {
    implementation 'com.hazelcast:hazelcast:5.1.7'
    implementation 'com.hazelcast.samples.jet:hazelcast-jet-examples-trade-source:5.1.7'
}

jar {
    enabled = false
    dependsOn(shadowJar{archiveClassifier.set("")})
    manifest.attributes 'Main-Class': 'org.example.TradeMonitor'
}

shadowJar {
    dependencies {
        exclude(dependency('com.hazelcast:hazelcast:5.1.7'))
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>trade-monitor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet</artifactId>
            <version>5.1.7</version>
        </dependency>
        <dependency>
            <groupId>com.hazelcast.samples.jet</groupId>
            <artifactId>hazelcast-jet-examples-trade-source</artifactId>
            <version>5.1.7</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>org.example.TradeMonitor</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.hazelcast:hazelcast</exclude>
                                </excludes>
                            </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Step 2. Define the Pipeline

The next thing we need to do is to write the data pipeline, using the Jet API.

We will define a pipeline doing the following:

  • read an unbounded stream of trades

  • compute the number of trades in the past minute, for each stock monitored (every 5 seconds)

  • compute the top 10 stocks with most trades from the previous results (every 5 seconds)

  • format and log the final results (every 5 seconds)

Add following class to your project.

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.jet.datamodel.WindowResult;
import com.hazelcast.samples.jet.tradesource.Trade;
import com.hazelcast.samples.jet.tradesource.TradeSource;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamStage;

import java.util.List;

import static com.hazelcast.function.ComparatorEx.comparing;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.aggregate.AggregateOperations.topN;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;
import static com.hazelcast.jet.pipeline.WindowDefinition.tumbling;
import static java.util.concurrent.TimeUnit.SECONDS;

public class TradeMonitor {

    private static final int TRADES_PER_SEC = 5000;
    private static final long MONITORING_INTERVAL = SECONDS.toMillis(60);
    private static final long REPORTING_INTERVAL = SECONDS.toMillis(5);

    public static void main(String[] args) {
        Pipeline pipeline = definePipeline();
        submitForExecution(pipeline);
    }

    private static Pipeline definePipeline() {
        Pipeline pipeline = Pipeline.create();

        StreamStage<Trade> source = pipeline.readFrom(TradeSource.tradeStream(TRADES_PER_SEC))
                .withNativeTimestamps(0);

        StreamStage<KeyedWindowResult<String, Long>> tradeCounts = source
                .groupingKey(Trade::getTicker)
                .window(sliding(MONITORING_INTERVAL, REPORTING_INTERVAL))
                .aggregate(counting());

        StreamStage<WindowResult<List<KeyedWindowResult<String, Long>>>> topN = tradeCounts
                .window(tumbling(REPORTING_INTERVAL))
                .aggregate(topN(10, comparing(KeyedWindowResult::result)));

        topN.map(wrList -> format(wrList.result()))
            .writeTo(Sinks.logger());

        return pipeline;
    }

    private static String format(List<KeyedWindowResult<String, Long>> results) {
        StringBuilder sb = new StringBuilder("Most active stocks in past minute:");
        for (int i = 0; i < results.size(); i++) {
            KeyedWindowResult<String, Long> result = results.get(i);
            sb.append(String.format("\n\t%2d. %5s - %d trades", i + 1, result.getKey(), result.getValue()));
        }
        return sb.toString();
    }

    private static void submitForExecution(Pipeline pipeline) {
        HazelcastInstance hz = Hazelcast.bootstrappedInstance();
        hz.getJet().newJob(pipeline, new JobConfig().setName("trade-monitor"));
    }

}

Step 3. Package the Pipeline into a JAR

Now we need to submit this code to Hazelcast for execution. Since Hazelcast runs on our machine as a standalone cluster in a standalone process we need to give it all the code that we have written.

For this reason we create a JAR containing everything we need. All we need to do is to run the build command:

  • Gradle

  • Maven

gradle build

This will produce a JAR file called trade-monitor-1.0-SNAPSHOT.jar in the build/libs directory of our project.

mvn package

This will produce a JAR file called trade-monitor-1.0-SNAPSHOT.jar in the target directory or our project.

Step 4. Submit the Job for Execution

Assuming our cluster is still running all we need to issue is following command:

  • Gradle

  • Maven

bin/hz-cli submit build/libs/trade-monitor-1.0-SNAPSHOT.jar
bin/hz-cli submit target/trade-monitor-1.0-SNAPSHOT.jar

The output you should be seeing in the Hazelcast member’s log is one such message every 5 seconds:

... Most active stocks in past minute:
     1.  AXDX - 55 trades
     2.  MTBC - 53 trades
     3.  ARIS - 52 trades
     4.  ASUR - 51 trades
     5.  CSBR - 50 trades
     6.  ARII - 50 trades
     7.  FTXD - 50 trades
     8. MSDIW - 49 trades
     9.  SGEN - 49 trades
    10. LILAK - 49 trades

Step 5. Clean up

  1. Cancel the job.

    bin/hz-cli cancel trade-monitor
  2. Shut down the Hazelcast cluster.

    bin/hz-stop