This is a prerelease version.

View latest

Get Started with Stream Processing (Client/Server)

This tutorial shows you how to use a Hazelcast cluster as a data processing engine for your client applications. At the end of this tutorial, you’ll know how to build a data pipeline in Java and submit it as a job to your Hazelcast cluster.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

A full Hazelcast distribution (Docker or Binary)

JDK 8 - 11

Maven

Step 1. Step Up the Project

First, you need to setup a Java project that you can later package and submit to a Hazelcast cluster.

  1. Check that you have Maven installed.

    mvn -v

    If Maven is installed, you should see some information about the Maven installation, which looks similar to the following:

    Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
    Maven home: /usr/local/Cellar/maven/3.8.1/libexec
    Java version: 16.0.1, vendor: Homebrew, runtime: /usr/local/Cellar/openjdk/16.0.1/libexec/openjdk.jdk/Contents/Home
    Default locale: en_GB, platform encoding: UTF-8
    OS name: "mac os x", version: "10.15.7", arch: "x86_64", family: "mac"
  2. Create the following structure in a project directory of your choice.

    ๐Ÿ“„ pom.xml
    ๐Ÿ“‚ src
      ๐Ÿ“‚ main
        ๐Ÿ“‚ java
          ๐Ÿ“„ EvenNumberStream.java
  3. Add the following to your pom.xml file to set your projectโ€™s name, version, and its dependencies on external libraries such as Hazelcast.

    Replace the ${jdk.version} placeholder with your JDK version.

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>hz-example</artifactId>
        <version>0.1.0</version>
    
        <repositories>
            <repository>
                <id>snapshot-repository</id>
                <name>Maven2 Snapshot Repository</name>
                <url>https://oss.sonatype.org/content/repositories/snapshots</url>
                <releases>
                    <enabled>false</enabled>
                </releases>
            </repository>
        </repositories>
    
        <dependencies>
            <dependency>
                <groupId>com.hazelcast</groupId>
                <artifactId>hazelcast</artifactId>
                <version>5.1-SNAPSHOT</version>
            </dependency>
        </dependencies>
    
    
        <properties>
            <maven.compiler.source>${jdk.version}</maven.compiler.source>
            <maven.compiler.target>${jdk.version}</maven.compiler.target>
        </properties>
    
    </project>

Step 2. Build your Stream Processing Pipeline

With Hazelcast, you can specify data processing steps, using the Java Jet API. This API defines a series of tasks that can be submitted to a Hazelcast cluster as a job.

The general pattern of a data processing pipeline is to read data from a data source, process (or transform) it, and write the results to a data sink. You can visualize these steps as a linear process:

readFromSource → transform → writeToSink

In this step, you create a pipeline that reads a stream of incrementing numbers from a test data source and prints only even numbers, using the console as a sink.

  1. Add the following to your EvenNumberStream.java file.

package org.example;

import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;

public class EvenNumberStream {
  public static void main(String[] args) {

    Pipeline pipeline = Pipeline.create(); (1)
    pipeline.readFrom(TestSources.itemStream(10)) (2)
      .withoutTimestamps() (3)
      .filter(event -> event.sequence() % 2 == 0) (4)
      .setName("filter out odd numbers") (5)
      .writeTo(Sinks.logger()); (6)

    HazelcastInstance hz = Hazelcast.bootstrappedInstance(); (7)

    hz.getJet().newJob(pipeline); (8)
  }
}
1 Initialize an empty pipeline.
2 Read from the dummy data source. Every 10 seconds, the itemStream() method emits SimpleEvent objects that contain an increasing sequence number.
3 Tell Hazelcast that you do not plan on using timestamps to process the data. Timestamps are useful for time-sensitive processes such as aggregating streaming data. In this example, you aren’t aggregating data.
4 Filter out any even numbers from the stream. The filter() method receives the SimpleEvent objects from the dummy source.
5 Set the name of this processing stage. Naming a processing stage makes it easier to recognize in the DAG view of Management Center.
6 Send the result of the streaming process to the console. A pipeline without any sinks is not valid.
7 Create a bootstrapped Hazelcast member. This bootstrapped member allows you to submit your pipeline as a packaged class to a running cluster, using the`hazelcast submit` command.
8 Pass your pipeline to the bootstrapped Jet engine.

Each method such as readFrom() or writeTo() results in a pipeline stage. The stage resulting from a writeTo() operation is called a sink stage and you can’t attach more stages to it. All other stages are called compute stages and expect you to attach further stages to them.

Step 3. Start a Hazelcast Member

In this step, you start a local single-member cluster to which you can submit your pipeline as a job.

  • Docker

  • Binary

  1. Create a new Docker network called hazelcast-network.

    docker network create hazelcast-network

    Docker networks make it easier for you to connect to your cluster and add other services that need to communicate with your cluster.

  2. Execute the Docker run command to start a member.

    docker run \
        -it \
        --network hazelcast-network \
        --rm \
        -p 5701:5701 hazelcast/hazelcast:5.1-SNAPSHOT

    Docker parameters:

    • -it: Starts an interactive session, allowing you to stop the member with Ctrl+C.

    • --rm: Tells Docker to remove the container from its local cache after it exits.

    • --network: Allows you to connect to clusters, using an alias.

    • -p: All member containers must publish port 5701 under a different host machine port.

Mac and Linux
bin/hz-start
Windows
bin/hz-start.bat

Step 4. Submit your Job to the Member

After building a pipeline, you can deploy it to your member by packaging the code into a JAR file and submitting it to the member as a job. After you submit the JAR file to the cluster, it will optimize the execution plan and start running the job for you.

  1. Package your Java code into a JAR file.

    mvn package
  2. From the Hazelcast home directory execute the hazelcast submit command.

    • Docker

    • Binary

    Replace the following placeholders:

    • $PATH_TO_TARGET: The absolute path to your target directory.

    • $MEMBER_IP: The IP address of the member to which to submit the job.

      docker run -it --network hazelcast-network -v $PATH_TO_TARGET:/jars --rm hazelcast/hazelcast:5.1-SNAPSHOT hz-cli -t $MEMBER_IP submit -c org.example.EvenNumberStream /jars/hz-example-0.1.0.jar

    In the console of your Hazelcast member, you should see that a new job has been submitted and it’s running on your cluster.

    Replace the $PATH_TO_JAR_FILE placeholder with the absolute or relative path to your packaged JAR file.

    Mac and Linux
    bin/hz-cli submit --class org.example.EvenNumberStream $PATH_TO_JAR_FILE
    Windows
    bin/hz-start.bat submit --class org.example.EvenNumberStream $PATH_TO_JAR_FILE
    To avoid the need to specify your main class in the hazelcast submit command, you should also set the Main-Class attribute in the MANIFEST.MF.
  3. To see a list of running jobs on your cluster, execute the list-jobs command:

    • Docker

    • Binary

    docker run -it --network hazelcast-network hazelcast/hazelcast hz-cli -t $MEMBER_IP list-jobs
    Mac and Linux
    bin/hz-cli list-jobs
    Windows
    bin/hz-start.bat list-jobs

    You should see the following:

    ID                  STATUS             SUBMISSION TIME         NAME
    03de-e38d-3480-0001 RUNNING            2020-02-09T16:30:26.843 N/A

    Each job has a unique cluster-wide ID. You can use this ID to manage the job.

    A job with a streaming source will run indefinitely until explicitly canceled or the cluster is shut down. Even if you kill the client application, the job keeps running on the cluster.

Step 4. Monitor your Jobs in Management Center

With Management Center, you can monitor the status of your jobs and manage the lifecycle of existing jobs in your cluster.

  1. Start Management Center.

    • Docker

    • Binary

    docker run \
        --network hazelcast-network \
        -p 8080:8080 hazelcast/management-center:latest-snapshot
    Mac and Linux
    management-center/bin/start.sh
    Windows
    management-center/bin/start.bat
  2. In a web browser, go to localhost:8080 and enable dev mode.

    Enabling dev mode in Management center

  3. Enter your cluster’s name (dev) and IP address.

    An active cluster in Management Center

  4. In the left menu of Management Center, go to Streaming > Jobs.

    You should see that your job is running.

    A job that is running on the cluster

  5. Click the job ID to open a detailed view of your job.

    You should see a graph (DAG) in the center of the page. This graph is a visual representation of how Hazelcast optimizes your jobs for distributed execution. You can learn more about this concept in How Hazelcast Models and Executes Jobs.

    You can click any node on the graph to see more information about how your cluster is executing it.

    Clicking a node on the graph opens a modal window that displays its data processing details

  6. To cancel your job, click Cancel.

    Cancel button at the top of the Jobs page

    In the console of the Hazelcast member, you should see that the job is canceled as well as the time it was started and how long it ran for.

    Execution of job '062d-d578-9240-0001', execution 062d-d578-df80-0001 got terminated, reason=java.util.concurrent.CancellationException
    	Start time: 2021-05-13T16:31:14.410
    	Duration: 00:02:48.318

Complete Code Sample

package org.example;

import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;

public class EvenNumberStream {
  public static void main(String[] args) {

    Pipeline pipeline = Pipeline.create();
    pipeline.readFrom(TestSources.itemStream(10))
      .withoutTimestamps()
      .filter(event -> event.sequence() % 2 == 0)
      .setName("filter out odd numbers")
      .writeTo(Sinks.logger());

    HazelcastInstance hz = Hazelcast.bootstrappedInstance();

    hz.getJet().newJob(pipeline);
  }
}

Next Steps

Explore all the built-in sources and sinks that you can plug into your own pipelines.