Enrich a Stream
Streams usually contain information that changes with time such as prices, quantities, and sales. Sometimes the stream needs to be enriched with static, or infrequently changing data, such as labels, organizational structure or some characteristics.
In this tutorial, you build a pipeline that takes a stream of trades from a stock exchange and enriches it with a full company name for nicer presentation. Each trade contains a stock symbol, also known as ticker, which uniquely identifies the stock. You will use the ticker to look up the full company name in a replicated map.
Before You Begin
To complete this tutorial, you need the following:
Prerequisites | Useful resources |
---|---|
A Hazelcast cluster running in client/server mode |
Step 1. Create a New Java Project
We’ll assume you’re using an IDE. Create a blank Java project named
map-join-tutorial
and copy the Gradle or Maven file into it:
plugins {
id 'com.github.johnrengelman.shadow' version '5.2.0'
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories.mavenCentral()
dependencies {
implementation 'com.hazelcast:hazelcast:5.0.5'
implementation 'com.hazelcast.samples.jet:hazelcast-jet-examples-trade-source:5.0.5'
}
jar {
enabled = false
dependsOn(shadowJar{archiveClassifier.set("")})
manifest.attributes 'Main-Class': 'org.example.JoinUsingMapJob'
}
shadowJar {
dependencies {
exclude(dependency('com.hazelcast:hazelcast:5.0.5'))
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>map-join-tutorial</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>5.0.5</version>
</dependency>
<dependency>
<groupId>com.hazelcast.samples.jet</groupId>
<artifactId>hazelcast-jet-examples-trade-source</artifactId>
<version>5.0.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.example.JoinUsingMapJob</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.hazelcast:hazelcast</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
If you want to know more about packaging jobs, see Submitting Jobs.
Step 2. Load Company Names into a Map
Copy the nasdaqlisted.txt
file containing a
list of company names to src/main/resources
.
The following code creates a map containing company names from the list. How we create the map is not important for this tutorial, e.g. it could be loaded from a local file, S3 or be a result of another job.
package org.example;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import java.io.*;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toMap;
public class LoadNames {
public static void main(String[] args) {
HazelcastInstance instance = HazelcastClient.newHazelcastClient();
Map<String, String> namesMap = loadNames();
instance.getMap("companyNames").putAll(namesMap);
System.out.println(namesMap.size() + " names put to a map called 'companyNames'");
instance.shutdown();
}
private static Map<String, String> loadNames() {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(
LoadNames.class.getResourceAsStream("/nasdaqlisted.txt"), UTF_8))) {
return reader.lines()
.skip(1)
.map(line -> line.split("\\|"))
.collect(toMap(parts -> parts[0], parts -> parts[1]));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Finally, run it from your IDE. You should see this in the among other logs:
3170 names put to a map called 'companyNames'
Step 3. Use the Map to Enrich the Trade Stream
This code takes a dummy source of trade data, enriches the trades with the company name and finally writes to log.
package org.example;
import com.hazelcast.jet.*;
import com.hazelcast.jet.config.*;
import com.hazelcast.samples.jet.tradesource.*;
import com.hazelcast.jet.pipeline.*;
import static com.hazelcast.jet.datamodel.Tuple4.tuple4;
public class JoinUsingMapJob {
public static final int TRADES_PER_SEC = 1;
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(TradeSource.tradeStream(TRADES_PER_SEC))
.withoutTimestamps()
.mapUsingIMap("companyNames", Trade::getTicker, (trade, name) ->
tuple4(trade.getTicker(), trade.getQuantity(), trade.getPrice(), name))
.writeTo(Sinks.logger(tuple -> String.format("%5s quantity=%4d, price=%d (%s)",
tuple.f0(), tuple.f1(), tuple.f2(), tuple.f3()
)));
HazelcastInstance instance = Hazelcast.bootstrappedInstance();
instance.getJet().newJob(pipeline, new JobConfig().setName("map-join-tutorial"));
instance.shutdown();
}
}
Submit the job to the Hazelcast cluster
gradle build
bin/hz-cli submit build/libs/map-join-tutorial-1.0-SNAPSHOT.jar
mvn package
bin/hz-cli submit target/map-join-tutorial-1.0-SNAPSHOT.jar
Now go to the window where you started Jet. Its log output will contain the output from the pipeline.
If you submit the job before loading the company names you will see null values. Once you run the LoadNames class you will immediately see company names. This is how you can react to changing data. You can restart the Hazelcast member to start with empty map to try this out.
Step 4. Clean up
-
Cancel the job
bin/hz-cli cancel map-join-tutorial
-
Shut down the Hazelcast cluster
bin/hz-stop
Next Steps
Learn more about reading data from maps and replicated maps in Connector Guides.