A newer version of Platform is available.

View latest

Enrich a Stream

Streams usually contain information that changes with time such as prices, quantities, and sales. Sometimes the stream needs to be enriched with static, or infrequently changing data, such as labels, organizational structure or some characteristics.

In this tutorial, you build a pipeline that takes a stream of trades from a stock exchange and enriches it with a full company name for nicer presentation. Each trade contains a stock symbol, also known as ticker, which uniquely identifies the stock. You will use the ticker to look up the full company name in a replicated map.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

A Hazelcast cluster running in client/server mode

Step 1. Create a New Java Project

We’ll assume you’re using an IDE. Create a blank Java project named map-join-tutorial and copy the Gradle or Maven file into it:

  • Gradle

  • Maven

plugins {
    id 'com.github.johnrengelman.shadow' version '5.2.0'
    id 'java'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories.mavenCentral()

dependencies {
    implementation 'com.hazelcast:hazelcast:5.1.7'
    implementation 'com.hazelcast.samples.jet:hazelcast-jet-examples-trade-source:5.1.7'
}

jar {
    enabled = false
    dependsOn(shadowJar{archiveClassifier.set("")})
    manifest.attributes 'Main-Class': 'org.example.JoinUsingMapJob'
}

shadowJar {
    dependencies {
        exclude(dependency('com.hazelcast:hazelcast:5.1.7'))
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>map-join-tutorial</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>5.1.7</version>
        </dependency>
        <dependency>
            <groupId>com.hazelcast.samples.jet</groupId>
            <artifactId>hazelcast-jet-examples-trade-source</artifactId>
            <version>5.1.7</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>org.example.JoinUsingMapJob</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.hazelcast:hazelcast</exclude>
                                </excludes>
                            </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

If you want to know more about packaging jobs, see Submitting Jobs.

Step 2. Load Company Names into a Map

Copy the nasdaqlisted.txt file containing a list of company names to src/main/resources.

The following code creates a map containing company names from the list. How we create the map is not important for this tutorial, e.g. it could be loaded from a local file, S3 or be a result of another job.

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;

import java.io.*;
import java.util.Map;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toMap;

public class LoadNames {

    public static void main(String[] args) {
        HazelcastInstance instance = HazelcastClient.newHazelcastClient();

        Map<String, String> namesMap = loadNames();
        instance.getMap("companyNames").putAll(namesMap);

        System.out.println(namesMap.size() + " names put to a map called 'companyNames'");

        instance.shutdown();
    }

    private static Map<String, String> loadNames() {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(
                LoadNames.class.getResourceAsStream("/nasdaqlisted.txt"), UTF_8))) {
            return reader.lines()
                    .skip(1)
                    .map(line -> line.split("\\|"))
                    .collect(toMap(parts -> parts[0], parts -> parts[1]));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

Finally, run it from your IDE. You should see this in the among other logs:

3170 names put to a map called 'companyNames'

Step 3. Use the Map to Enrich the Trade Stream

This code takes a dummy source of trade data, enriches the trades with the company name and finally writes to log.

package org.example;

import com.hazelcast.jet.*;
import com.hazelcast.jet.config.*;
import com.hazelcast.samples.jet.tradesource.*;
import com.hazelcast.jet.pipeline.*;

import static com.hazelcast.jet.datamodel.Tuple4.tuple4;

public class JoinUsingMapJob {

    public static final int TRADES_PER_SEC = 1;

    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();

        pipeline.readFrom(TradeSource.tradeStream(TRADES_PER_SEC))
         .withoutTimestamps()
         .mapUsingIMap("companyNames", Trade::getTicker, (trade, name) ->
             tuple4(trade.getTicker(), trade.getQuantity(), trade.getPrice(), name))
         .writeTo(Sinks.logger(tuple -> String.format("%5s quantity=%4d, price=%d (%s)",
             tuple.f0(), tuple.f1(), tuple.f2(), tuple.f3()
         )));

        HazelcastInstance instance = Hazelcast.bootstrappedInstance();
        instance.getJet().newJob(pipeline, new JobConfig().setName("map-join-tutorial"));
        instance.shutdown();
    }

}

Submit the job to the Hazelcast cluster

  • Gradle

  • Maven

gradle build
bin/hz-cli submit build/libs/map-join-tutorial-1.0-SNAPSHOT.jar
mvn package
bin/hz-cli submit target/map-join-tutorial-1.0-SNAPSHOT.jar

Now go to the window where you started Jet. Its log output will contain the output from the pipeline.

If you submit the job before loading the company names you will see null values. Once you run the LoadNames class you will immediately see company names. This is how you can react to changing data. You can restart the Hazelcast member to start with empty map to try this out.

Step 4. Clean up

  1. Cancel the job

    bin/hz-cli cancel map-join-tutorial
  2. Shut down the Hazelcast cluster

    bin/hz-stop

Next Steps

Learn more about reading data from maps and replicated maps in Connector Guides.