This is a prerelease version.

Get Started with Stream Processing (Client/Server)

This tutorial shows you how to use a Hazelcast cluster as a data processing engine for your client applications. At the end of this tutorial, you’ll know how to build a data pipeline in Java and submit it as a job to your Hazelcast cluster.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

A local Hazelcast cluster (client/server mode) and an instance of Management Center running on your local network

Step 1. Write your Stream Processing Pipeline

With Hazelcast, you can specify data processing steps, using the Java Jet API. This API defines a series of tasks that can be submitted to a Hazelcast cluster as a job.

The general pattern of a data processing pipeline is to read data from a data source, process (or transform) it, and write the results to a data sink. You can visualize these steps as a linear process:

readFromSource → transform → writeToSink

In this step, you create a pipeline that reads a stream of incrementing numbers from a dummy data source and prints only even numbers, using the console as a sink.

  1. Import the dependencies and create an App class.

    import com.hazelcast.config.Config;
    import com.hazelcast.core.Hazelcast;
    import com.hazelcast.core.HazelcastInstance;
    import com.hazelcast.jet.pipeline.Pipeline;
    import com.hazelcast.jet.pipeline.Sinks;
    import com.hazelcast.jet.pipeline.test.TestSources;
    
    public class App {
      public static void main(String[] args) {
      }
    }
  2. Write a buildEvenNumberStreamPipeline() method for your App class:

    private static Pipeline buildEvenNumberStreamPipeline() {
      Pipeline EvenNumberStream = Pipeline.create(); (1)
      EvenNumberStream.readFrom(TestSources.itemStream(10)) (2)
       .withoutTimestamps() (3)
       .filter(event -> event.sequence() % 2 == 0) (4)
       .setName("filter out odd numbers") (5)
       .writeTo(Sinks.logger()); (6)
    
       return EvenNumberStream;
    }
    1 Initialize an empty pipeline.
    2 Read from the dummy data source. Every 10 seconds, the itemStream() method emits SimpleEvent objects that contain an increasing sequence number.
    3 Tell Hazelcast that you do not plan on using timestamps to process the data.
    4 Filter out any even numbers from the stream. The filter() method receives the SimpleEvent objects from the dummy source.
    5 Set the name of the job that you can use to manage it in the future for tasks such as canceling the job.
    6 Send the result of the streaming process to the console. A pipeline without any sinks is not valid.

    Each method such as readFrom() or writeTo() results in a pipeline stage. The stage resulting from a writeTo() operation is called a sink stage and you can’t attach more stages to it. All other stages are called compute stages and expect you to attach further stages to them.

  3. To start the Jet engine, add this code to the bottom of your main() method:

    Pipeline evenNumberStream = buildEvenNumberStreamPipeline(); (1)
    
    HazelcastInstance hz = Hazelcast.bootstrappedInstance(); (2)
    
    try {
        hz.getJet().newJob(evenNumberStream); (3)
    } finally {
        hz.shutdown();
    }
    1 Create an instance of your pipeline.
    2 Create a bootstrapped Hazelcast member. This bootstrapped member allows you to submit your packaged classes to the cluster using the`hazelcast submit` command.
    3 Pass your pipeline to the bootstrapped Jet engine.

Step 2. Submit your Job to the Cluster

After creating a pipeline, you can deploy it to your cluster by packaging the code into a JAR file and submitting it to the cluster.

After you submit the JAR file to the cluster, it will optimize the execution plan and start running the job for you.

  1. Package your Java code into a JAR file.

    If you’re using Maven, you can use the mvn package command to generate a JAR file.

    If you’re using Gradle, you can use the gradle build command to generate a JAR file.

  2. From the Hazelcast home folder execute the hazelcast submit command.

    • Binary

    • Docker

    Replace the following placeholders:

    • $NAME_OF_MAIN_CLASS: The name of the main class in your packaged JAR file.

      To avoid the need to specify your main class in the hazelcast submit command, you should also set the Main-Class attribute in the MANIFEST.MF. Both Maven and Gradle can be configured to do this, refer to their docs.
    • $PATH_TO_JAR_FILE: The path to your packaged JAR file.

    • $MEMBER_IP: The IP address of a member in your cluster to which to submit the job.

    bin/hz-cli submit -c $NAME_OF_MAIN_CLASS $PATH_TO_JAR_FILE -t $MEMBER_IP

    Replace the following placeholders:

    • $PATH_TO_JAR_FILE: The path to your packaged JAR file.

    • $MEMBER_IP: The IP address of a member in your cluster to which to submit the job.

    • $NAME_OF_MAIN_CLASS: The name of the main class in your packaged JAR file.

      To avoid the need to specify your main class in the hazelcast submit command, you should also set the Main-Class attribute in the MANIFEST.MF. Both Maven and Gradle can be configured to do this, refer to their docs.
    • $NAME_OF_JAR_FILE: The name of your packaged JAR file.

    docker run -it -v $PATH_TO_JAR_FILE:/jars --rm hazelcast/Hazelcast -t $MEMBER_IP submit -c $NAME_OF_MAIN_CLASS /jars/$NAME_OF_JAR_FILE

    Take a moment to learn which Docker parameters are included in this command:

    • -it: Starts an interactive session, allowing you to cancel the submit command with Ctrl+C.

    • -v: Mounts the folder that contains your JAR file from your current directory to the /jars folder inside your Docker container.

    • --rm: Tells Docker to remove the container from its local cache after it exits.

    In the console of your Hazelcast member, you should see that a new job has been submitted and it’s running on your cluster.

  3. To see a list of running jobs on your cluster, execute the list-jobs command:

    • Binary

    • Docker

    bin/hz-cli list-jobs
    docker run -it hazelcast/Hazelcast -t 172.17.0.2 list-jobs

    You should see the following:

    ID                  STATUS             SUBMISSION TIME         NAME
    03de-e38d-3480-0001 RUNNING            2020-02-09T16:30:26.843 N/A

    Each job has a unique cluster-wide ID. You can use this ID to manage the job.

    A job with a streaming source will run indefinitely until explicitly canceled or the cluster is shut down. Even if you kill the client application, the job keeps running on the cluster.

Step 3. Monitor your Jobs in Management Center

With Management Center, you can monitor the status of your jobs and manage the lifecycle of existing jobs in your cluster.

  1. In the left navigation panel of Management Center, go to Streaming > Jobs.

    You should see that your job is running.

    A job that is running on the cluster

  2. Click the job ID to open a detailed view of your job.

    You should see a graph (DAG) in the center of the page. This graph is a visual representation of how Hazelcast optimizes your jobs for distributed execution. You can learn more about this concept in How Hazelcast Models and Executes Jobs.

    You can click any node on the graph to see more information about how your cluster is executing it.

    Clicking a node on the graph opens a modal window that displays its data processing details

  3. To cancel your job, click Cancel.

    Cancel button at the top of the Jobs page

    In the console of the Hazelcast member, you should see that the job is canceled as well as the time it was started and how long it ran for.

    Execution of job '062d-d578-9240-0001', execution 062d-d578-df80-0001 got terminated, reason=java.util.concurrent.CancellationException
    	Start time: 2021-05-13T16:31:14.410
    	Duration: 00:02:48.318

Next Steps

Learn more about how to use the Management Center.

Explore all the built-in sources and sinks that you can plug into your own pipelines.