Get Started with Stream Processing (Embedded)

This tutorial shows you how to use Hazelcast as a streaming engine that’s embedded in your Java application. At the end of this tutorial, you’ll know how to ingest data from some test sources and filter it continuously to generate results in real time.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

An embedded Hazelcast cluster

Maven

Step 1. Step Up the Project

First, you need to setup a Java project that you can later package and submit to your Hazelcast cluster.

  1. Check that you have Maven installed.

    mvn -v

    If Maven is installed, you should see some information about the Maven installation, which looks similar to the following:

    Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
    Maven home: /usr/local/Cellar/maven/3.8.1/libexec
    Java version: 16.0.1, vendor: Homebrew, runtime: /usr/local/Cellar/openjdk/16.0.1/libexec/openjdk.jdk/Contents/Home
    Default locale: en_GB, platform encoding: UTF-8
    OS name: "mac os x", version: "10.15.7", arch: "x86_64", family: "mac"
  2. Create the following structure in a project directory of your choice.

    📄 pom.xml
    📂 src
      📂 main
        📂 java
          📄 EvenNumberStream.java
  3. Add the following to your pom.xml file to set your project’s name, version, and its dependencies on external libraries such as Hazelcast.

    Replace the ${jdk.version} placeholder with your JDK version.

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>hz-example</artifactId>
        <version>0.1.0</version>
    
        <dependencies>
            <dependency>
                <groupId>com.hazelcast</groupId>
                <artifactId>hazelcast</artifactId>
                <version>5.0</version>
            </dependency>
        </dependencies>
    
    
        <properties>
            <maven.compiler.source>${jdk.version}</maven.compiler.source>
            <maven.compiler.target>${jdk.version}</maven.compiler.target>
        </properties>
    
    </project>

Step 2. Build your Stream Processing Pipeline

With Hazelcast, you can specify data processing steps, using the Java Jet API. This API defines a series of tasks that can be submitted to a Hazelcast cluster as a job.

The general pattern of a data processing pipeline is to read data from a data source, process (or transform) it, and write the results to a data sink. You can visualize these steps as a linear process:

readFromSource → transform → writeToSink

In this step, you create a pipeline that reads a stream of incrementing numbers from a test data source and prints only even numbers, using the console as a sink.

  1. Add the following to your EvenNumberStream.java file.

package org.example;

import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;

public class EvenNumberStream {
  public static void main(String[] args) {

    Pipeline pipeline = Pipeline.create(); (1)
    pipeline.readFrom(TestSources.itemStream(10)) (2)
      .withoutTimestamps() (3)
      .filter(event -> event.sequence() % 2 == 0) (4)
      .setName("filter out odd numbers") (5)
      .writeTo(Sinks.logger()); (6)

    HazelcastInstance hz = Hazelcast.bootstrappedInstance(); (7)

    hz.getJet().newJob(pipeline); (8)
  }
}
1 Initialize an empty pipeline.
2 Read from the dummy data source. Every 10 seconds, the itemStream() method emits SimpleEvent objects that contain an increasing sequence number.
3 Tell Hazelcast that you do not plan on using timestamps to process the data. Timestamps are useful for time-sensitive processes such as aggregating streaming data. In this example, you aren’t aggregating data.
4 Filter out any even numbers from the stream. The filter() method receives the SimpleEvent objects from the dummy source.
5 Set the name of this processing stage. Naming a processing stage makes it easier to recognize in the DAG view of Management Center.
6 Send the result of the streaming process to the console. A pipeline without any sinks is not valid.
7 Create a bootstrapped Hazelcast member. This bootstrapped member allows you to submit your pipeline as a packaged class to a running cluster, using the`hazelcast submit` command.
8 Pass your pipeline to the bootstrapped Jet engine.

Each method such as readFrom() or writeTo() results in a pipeline stage. The stage resulting from a writeTo() operation is called a sink stage and you can’t attach more stages to it. All other stages are called compute stages and expect you to attach further stages to them.

Step 3. Execute the Application

To start Hazelcast members and run your pipeline code, use Maven to compile and execute your EvenNumberStream class.

mvn compile exec:java -Dexec.mainClass="org.example.EvenNumberStream"

In the console, you’ll see your members form a cluster. Then, you’ll see the output of your job, where the sequence numbers are all even:

11:28:24.039 [INFO] [loggerSink#0] (timestamp=11:28:24.000, sequence=0)
11:28:24.246 [INFO] [loggerSink#0] (timestamp=11:28:24.200, sequence=2)
11:28:24.443 [INFO] [loggerSink#0] (timestamp=11:28:24.400, sequence=4)
11:28:24.647 [INFO] [loggerSink#0] (timestamp=11:28:24.600, sequence=6)
11:28:24.846 [INFO] [loggerSink#0] (timestamp=11:28:24.800, sequence=8)
11:28:25.038 [INFO] [loggerSink#0] (timestamp=11:28:25.000, sequence=10)
11:28:25.241 [INFO] [loggerSink#0] (timestamp=11:28:25.200, sequence=12)
11:28:25.443 [INFO] [loggerSink#0] (timestamp=11:28:25.400, sequence=14)
11:28:25.643 [INFO] [loggerSink#0] (timestamp=11:28:25.600, sequence=16)

You may also notice that Hazelcast prints its execution plan (in DOT format) for your job, which looks like the following:

digraph DAG {
	"itemStream" [localParallelism=1];
	"filter out odd numbers" [localParallelism=8];
	"loggerSink" [localParallelism=1];
	"itemStream" -> "filter out odd numbers" [queueSize=1024];
	"filter out odd numbers" -> "loggerSink" [queueSize=1024];
}

This plan is a visual representation of how the Jet service optimizes your jobs for distributed execution. You can learn more about this concept in How Hazelcast Models and Executes Jobs.

To visualize these execution plans, you can use tools such as http://viz-js.com. For example, this execution plan looks like this:

digraph DAG {
	"itemStream"
	"filter out odd numbers"
	"loggerSink"
	"itemStream" -> "filter out odd numbers"
	"filter out odd numbers" -> "loggerSink"
}

Complete Code Sample

package org.example;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;

public class App {
  public static void main(String[] args) {

    Pipeline evenNumberStream = Pipeline.create();
    evenNumberStream.readFrom(TestSources.itemStream(10))
      .withoutTimestamps()
      .filter(event -> event.sequence() % 2 == 0)
      .setName("filter out odd numbers")
      .writeTo(Sinks.logger());

    HazelcastInstance hz = Hazelcast.bootstrappedInstance();

    hz.getJet().newJob(evenNumberStream);
  }
}
For more code samples, see this Hazelcast GitHub repository.

Next Steps

Explore all the built-in sources and sinks that you can plug into your own pipelines.