Get Started with Stream Processing (Embedded)
This tutorial shows you how to use Hazelcast as a streaming engine that’s embedded in your Java application. At the end of this tutorial, you’ll know how to ingest data from some test sources and filter it continuously to generate results in real time.
Before You Begin
To complete this tutorial, you need the following:
Prerequisites | Useful resources |
---|---|
An embedded Hazelcast cluster |
|
Maven |
Step 1. Set Up the Project
First, you need to setup a Java project that you can later package and submit to your Hazelcast cluster.
-
Check that you have Maven installed.
mvn -v
If Maven is installed, you should see some information about the Maven installation, which looks similar to the following:
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d) Maven home: /usr/local/Cellar/maven/3.8.1/libexec Java version: 16.0.1, vendor: Homebrew, runtime: /usr/local/Cellar/openjdk/16.0.1/libexec/openjdk.jdk/Contents/Home Default locale: en_GB, platform encoding: UTF-8 OS name: "mac os x", version: "10.15.7", arch: "x86_64", family: "mac"
-
Create the following structure in a project directory of your choice.
📄 pom.xml 📂 src 📂 main 📂 java 📄 EvenNumberStream.java
-
Add the following to your
pom.xml
file to set your project’s name, version, and its dependencies on external libraries such as Hazelcast.Replace the
${jdk.version}
placeholder with your JDK version.<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>hz-example</artifactId> <version>0.1.0</version> <repositories> <repository> <id>snapshot-repository</id> <name>Maven2 Snapshot Repository</name> <url>https://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> </repository> </repositories> <dependencies> <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>6.0.0-SNAPSHOT</version> </dependency> </dependencies> <properties> <maven.compiler.source>${jdk.version}</maven.compiler.source> <maven.compiler.target>${jdk.version}</maven.compiler.target> </properties> </project>
Step 2. Build your Stream Processing Pipeline
With Hazelcast, you can specify data processing steps, using the Java Jet API. This API defines a series of tasks that can be submitted to a Hazelcast cluster as a job.
The general pattern of a data processing pipeline is to read data from a data source, process (or transform) it, and write the results to a data sink. You can visualize these steps as a linear process:
readFromSource → transform → writeToSink
In this step, you create a pipeline that reads a stream of incrementing numbers from a test data source and prints only even numbers, using the console as a sink.
-
Add the following to your
EvenNumberStream.java
file.
package org.example;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;
public class EvenNumberStream {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(); (1)
pipeline.readFrom(TestSources.itemStream(10)) (2)
.withoutTimestamps() (3)
.filter(event -> event.sequence() % 2 == 0) (4)
.setName("filter out odd numbers") (5)
.writeTo(Sinks.logger()); (6)
HazelcastInstance hz = Hazelcast.bootstrappedInstance(); (7)
hz.getJet().newJob(pipeline); (8)
}
}
1 | Initialize an empty pipeline. |
2 | Read from the dummy data source. Every 10 seconds, the itemStream() method emits SimpleEvent objects that contain an increasing sequence number. |
3 | Tell Hazelcast that you do not plan on using timestamps to process the data. Timestamps are useful for time-sensitive processes such as aggregating streaming data. In this example, you aren’t aggregating data. |
4 | Filter out any even numbers from the stream. The filter() method receives the SimpleEvent objects from the dummy source. |
5 | Set the name of this processing stage. Naming a processing stage makes it easier to recognize in the DAG view of Management Center. |
6 | Send the result of the streaming process to the console. A pipeline without any sinks is not valid. |
7 | Create a bootstrapped Hazelcast member. This bootstrapped member allows you to submit your pipeline as a packaged class to a running cluster, using the`hazelcast submit` command. |
8 | Pass your pipeline to the bootstrapped Jet engine. |
Each method such as readFrom()
or writeTo()
results in a pipeline stage. The stage resulting from a writeTo()
operation is called a
sink stage and you can’t attach more stages to it. All other stages are
called compute stages and expect you to attach further stages to them.
Step 3. Execute the Application
To start Hazelcast members and run your pipeline code, use Maven to compile and execute your EvenNumberStream
class.
mvn compile exec:java -Dexec.mainClass="org.example.EvenNumberStream"
In the console, you’ll see your members form a cluster. Then, you’ll see the output of your job, where the sequence numbers are all even:
11:28:24.039 [INFO] [loggerSink#0] (timestamp=11:28:24.000, sequence=0)
11:28:24.246 [INFO] [loggerSink#0] (timestamp=11:28:24.200, sequence=2)
11:28:24.443 [INFO] [loggerSink#0] (timestamp=11:28:24.400, sequence=4)
11:28:24.647 [INFO] [loggerSink#0] (timestamp=11:28:24.600, sequence=6)
11:28:24.846 [INFO] [loggerSink#0] (timestamp=11:28:24.800, sequence=8)
11:28:25.038 [INFO] [loggerSink#0] (timestamp=11:28:25.000, sequence=10)
11:28:25.241 [INFO] [loggerSink#0] (timestamp=11:28:25.200, sequence=12)
11:28:25.443 [INFO] [loggerSink#0] (timestamp=11:28:25.400, sequence=14)
11:28:25.643 [INFO] [loggerSink#0] (timestamp=11:28:25.600, sequence=16)
You may also notice that Hazelcast prints its execution plan (in DOT format) for your job, which looks like the following:
digraph DAG {
"itemStream" [localParallelism=1];
"filter out odd numbers" [localParallelism=8];
"loggerSink" [localParallelism=1];
"itemStream" -> "filter out odd numbers" [queueSize=1024];
"filter out odd numbers" -> "loggerSink" [queueSize=1024];
}
This plan is a visual representation of how the Jet service optimizes your jobs for distributed execution. You can learn more about this concept in Jet: How Hazelcast Models and Executes Jobs.
To visualize these execution plans, you can use tools such as http://viz-js.com. For example, this execution plan looks like this:
Complete Code Sample
package org.example;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;
public class App {
public static void main(String[] args) {
Pipeline evenNumberStream = Pipeline.create();
evenNumberStream.readFrom(TestSources.itemStream(10))
.withoutTimestamps()
.filter(event -> event.sequence() % 2 == 0)
.setName("filter out odd numbers")
.writeTo(Sinks.logger());
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
hz.getJet().newJob(evenNumberStream);
}
}
For more code samples, see this Hazelcast GitHub repository. |
Next Steps
Explore all the built-in sources and sinks that you can plug into your own pipelines.