This is a prerelease version.

Get Started with Stream Processing (Embedded)

This tutorial shows you how to use Hazelcast as a streaming engine that’s embedded in your Java application. At the end of this tutorial, you’ll know how to ingest data from some test sources and filter it continuously to generate results in real time.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

An embedded Hazelcast cluster

Step 1. Write a Stream Processing Pipeline

With Hazelcast, you can specify data processing steps, using the Jet API. This API creates a series of tasks that can be submitted to a Hazelcast cluster as a job.

The general pattern of a data processing pipeline is to read data from a data source, process (or transform) it, and write the results to a data sink. You can visualize these steps as a linear process:

readFromSource → transform → writeToSink

In this step, you create a pipeline that reads a stream of incrementing numbers from a dummy data source and prints only even numbers, using the console as a sink.

Write a buildEvenNumberStreamPipeline() method for your App class:

private static Pipeline buildEvenNumberStreamPipeline() {
  Pipeline EvenNumberStream = Pipeline.create(); (1)
  EvenNumberStream.readFrom(TestSources.itemStream(10)) (2)
   .withoutTimestamps() (3)
   .filter(event -> event.sequence() % 2 == 0) (4)
   .setName("filter out odd numbers") (5)
   .writeTo(Sinks.logger()); (6)

   return EvenNumberStream;
}
1 Initialize an empty pipeline.
2 Read from the dummy data source. Every 10 seconds, the itemStream() method emits SimpleEvent objects that contain an increasing sequence number.
3 Tell Hazelcast that you do not plan on using timestamps to process the data.
4 Filter out any even numbers from the stream. The filter() method receives the SimpleEvent objects from the dummy source.
5 Set the name of the job that you can use to manage it in the future for tasks such as canceling the job.
6 Send the result of the streaming process to the console. A pipeline without any sinks is not valid.

Each method such as readFrom() or writeTo() results in a pipeline stage. The stage resulting from a writeTo() operation is called a sink stage and you can’t attach more stages to it. All other stages are called compute stages and expect you to attach further stages to them.

Step 2. Submit the Job to your Cluster

After creating a pipeline, you can tell Hazelcast to run it by submitting it to Hazelcast, using the JetService object.

  1. Add this code to your main() method:

    (1)
    Config config = new Config();
    JetConfig jetConfig = config.getJetConfig();
    jetConfig.setEnabled(true);
    HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
    
    Pipeline evenNumberStream = buildEvenNumberStreamPipeline(); (2)
    
    try {
        hz.getJet().newJob(evenNumberStream); (3)
    } finally {
        hz.shutdown(); (4)
    }
    1 Enable the Jet engine.
    2 Get a reference to the pipeline.
    3 Submit the job to the Hazelcast member.
    4 Shut down the member when the job is canceled.
  2. Execute the code.

In the console, you’ll see your members form a cluster. Then, you’ll see the output of your job, where the sequence numbers are all even:

11:28:24.039 [INFO] [loggerSink#0] (timestamp=11:28:24.000, sequence=0)
11:28:24.246 [INFO] [loggerSink#0] (timestamp=11:28:24.200, sequence=2)
11:28:24.443 [INFO] [loggerSink#0] (timestamp=11:28:24.400, sequence=4)
11:28:24.647 [INFO] [loggerSink#0] (timestamp=11:28:24.600, sequence=6)
11:28:24.846 [INFO] [loggerSink#0] (timestamp=11:28:24.800, sequence=8)
11:28:25.038 [INFO] [loggerSink#0] (timestamp=11:28:25.000, sequence=10)
11:28:25.241 [INFO] [loggerSink#0] (timestamp=11:28:25.200, sequence=12)
11:28:25.443 [INFO] [loggerSink#0] (timestamp=11:28:25.400, sequence=14)
11:28:25.643 [INFO] [loggerSink#0] (timestamp=11:28:25.600, sequence=16)

You may also notice that Hazelcast prints its execution plan (in DOT format) for your job, which looks like the following:

digraph DAG {
	"itemStream" [localParallelism=1];
	"filter out odd numbers" [localParallelism=8];
	"loggerSink" [localParallelism=1];
	"itemStream" -> "filter out odd numbers" [queueSize=1024];
	"filter out odd numbers" -> "loggerSink" [queueSize=1024];
}

This data is a visual representation of how the Jet service optimizes your jobs for distributed execution. You can learn more about this concept in How Hazelcast Models and Executes Jobs.

To visualize these execution plans, you can use tools such as http://viz-js.com. For example, this execution plan looks like this:

digraph DAG {
	"itemStream"
	"filter out odd numbers"
	"loggerSink"
	"itemStream" -> "filter out odd numbers"
	"filter out odd numbers" -> "loggerSink"
}

Complete Code Sample

package org.example;

import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;

import java.util.Map;

public class App {
    public static void main(String[] args) {
        Config config = new Config();
        JetConfig jetConfig = config.getJetConfig();
        jetConfig.setEnabled(true).setResourceUploadEnabled(true);
        HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);

        Pipeline evenNumberStream = buildEvenNumberStreamPipeline();

        try {
            hz.getJet().newJob(evenNumberStream);
        } finally {
            hz.shutdown();
        }

    }
    private static Pipeline buildEvenNumberStreamPipeline() {
        Pipeline EvenNumberStream = Pipeline.create();
        EvenNumberStream.readFrom(TestSources.itemStream(10))
       .withoutTimestamps()
       .filter(event -> event.sequence() % 2 == 0)
       .setName("filter out odd numbers")
       .writeTo(Sinks.logger());

        return EvenNumberStream;
    }
}
For more code samples, see this Hazelcast GitHub repository.

Next Steps

Explore all the built-in sources and sinks that you can plug into your own pipelines.