Pipeline API Tutorial - The Machine Shop

Use the Hazelcast Pipeline API to build an application for modifying machine shop operations based on real time telemetry.


ACME operates thousands of machines, each reporting several datapoints per second. Measurements include things like bit temperature and RPM. Breakage is expensive, both in replacement parts and in downtime. Maintenance schedules can prevent some loss, but we want to go beyond that to monitor operations in real time. If excessive bit temperatures are caught in time, breakage can be averted by immediately reducing the cutting speed.

Each machine has its own parameters for acceptable bit temperature, which are stored in a machine_profiles IMap. Our Pipeline will do this by sending "green" / "orange" / "red" signals to the machines via the machine_controls IMap. A schematic of the lab is shown below.


Before You Begin

You will need the following installed on your system:

Step 1: Validate the Lab Environment

  1. Download the repo from https://github.com/hazelcast-guides/stream-processing-fundamentals/

  2. Run the following commands to start your lab environment.

    mvn install
    docker compose up -d
    docker compose ps

    You should see four services up and running. You may also see a fifth service called refdata_loader which exits after it has loaded data into the machine_profiles map.

    Each service is described briefly below.




    A Hazelcast node


    The Hazelcast Management Center. Accessible at http://localhost:8080.


    Emulates traffic from a set of machines. Writes to machine_events. Listens to machine_controls.


    A Python program that listens to the machine_events map and displays temperature data graphically. Accessible at http://localhost:8050

  3. Access the UI at http://localhost:8050. Specify a Location and Block to see a live display of the current temperature of a subset of machines. Valid locations are "San Antonio" and "Los Angeles". Valid blocks are "A" and "B".

Machines in block "A" are very likely to run hot. You should see something similar to the image below.


Step 2: Get to Know the Data

Data in Hazelcast clusters can be accessed via SQL. For more details, see https://docs.hazelcast.com/hazelcast/latest/sql/get-started-sql

  1. Open up the management center (http://localhost:8080) and click on the "SQL Browser" button.


  2. Run select * from machine_profiles in the SQL Browser.

    You will see that for each machine there is a serial number, information about the location and the warning and critical temperature limits for that particular machine.


  3. Run select * from machine_events


    This is the actual data coming from the machines.

    At any given time, only the latest event for each serial number appears in the map, however, the update events are all recorded into something called a map journal that can be used as input to a pipeline. You can verify that the map is constantly being updated by navigating to the machine_events map in Management Center and observing the number of put operations.


  4. In a terminal window, set up the Command Line Client to connect to the Hazelcast instance in Docker, then open a CLC connection.

    clc config add Docker cluster.name=dev cluster.address=localhost:5701
    clc -c Docker
  5. Run the same SQL queries as above.

From this point forward, you can use either Management Center or CLC to run SQL queries and monitor jobs. Management Center will also display the DAG created for submitted jobs.

Step 3: Learn About Data Formats


It is a best practice to avoid using POJOs in Pipelines if that POJO will be stored in a Hazelcast map. It can cause class loading problems.

When using Compact or Portable serialization, if a POJO is accessed in server-side code, it will be be deserialized as a GenericRecord if the POJO class is not on the server’s class path. Accordingly, server side code must be written to handle either a GenericRecord or a POJO depending on whether the POJO class is present.

In this lab, you will be working with MachineProfile objects and MachineEvents, both of which are defined in common/src/main/java. However, those classes are not deployed with your job, so you will need to access them using GenericRecord. An example of GenericRecord usage is shown below.

GenericRecord machineEvent;
short bitTemperature = machineEvent.getInt16("bitTemp");

For more information on GenericRecord and accessing domain objects without domain classes see https://docs.hazelcast.com/hazelcast/5.3/clusters/accessing-domain-objects.


As data proceeds through your pipeline its shape changes. For example, you may look up the warning and critical temperatures for a particular machine and send them along with the original event to the next stage in the pipeline. There is no need to create special container classes for situations like this, you can use Tuples instead. Here is an example.

// create a 3-tuple that consists of the serial number and bit temperature from the event
// and the warning temperature from the machine profile

GenericRecord p;
GenericRecord e;

Tuple3<String,Short,Short> newEvent =
        Tuple3.tuple3(e.getString("serialNum"), e.getShort("bitTemp"), p.getShort("warningTemp"));

// now, if we want to access fields from the 3-tuple, we use f0(), f1() and f2()
short bitTemp = newEvent.f1();

Step 4: Deploy Your First Job

  1. In your IDE, navigate to the monitoring-pipeline project. Open up the hazelcast.platform.labs.machineshop.TemperatureMonitorPipeline class and review the code there.

    The main method, shown below, is boilerplate that helps with deploying the job to a cluster. You do not need to change this.

        public static void main(String []args){
            Pipeline pipeline = createPipeline();
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName("Temperature Monitor");
            HazelcastInstance hz = Hazelcast.bootstrappedInstance();
            hz.getJet().newJob(pipeline, jobConfig);

    You will do all of your work in the createPipeline method of this job. It always starts with creating a Pipeline object. You then build up the Pipeline by adding stages to it.

       public static Pipeline createPipeline(){
            Pipeline pipeline = Pipeline.create();
            // add your stages here
            return pipeline;
    We use the Shade plugin to bundle all project dependencies, other than Hazelcast, into a single jar. The Hazelcast classes should not be included because they are already on the server. Code with com.hazelcast package names cannot be deployed to a Hazelcast Cloud cluster.

    Currently, the createPipeline method contains only a source (reading from the machine_events map) and a sink, which simply logs the events to the console. This can be useful during debugging. In the next step, you’ll make a small change to the Pipeline and walk through a typical code/test cycle.

  2. Make a small change to the output format in the writeTo statement just so we can walk through building and deploying a pipeline. After you’ve made the change, you can deploy the pipeline using the commands below.

    cd monitoring-pipeline
    mvn package
    cd ..
    # use the submission script, passing the name of the cluster you want to run the job
    ./clc-scripts/submitjob.sh Docker
    # look for the logging statements in the Hazelcast logs
    docker compose logs --follow hz

    You should see something like this in the Hazelcast cluster member log:

stream-processing-fundamentals-hz-1  | 2023-02-01 21:11:44,357 [ INFO] [hz.hungry_lehmann.jet.blocking.thread-0] [c.h.j.i.c.WriteLoggerP]: []:5701 [dev] [5.2.1] [temp_monitor_161114/loggerSink#0] New Event SN=HYV569
stream-processing-fundamentals-hz-1  | 2023-02-01 21:11:44,370 [ INFO] [hz.hungry_lehmann.jet.blocking.thread-0] [c.h.j.i.c.WriteLoggerP]: []:5701 [dev] [5.2.1] [temp_monitor_161114/loggerSink#0] New Event SN=FXQ058
stream-processing-fundamentals-hz-1  | 2023-02-01 21:11:44,591 [ INFO] [hz.hungry_lehmann.jet.blocking.thread-0] [c.h.j.i.c.WriteLoggerP]: []:5701 [dev] [5.2.1] [temp_monitor_161114/loggerSink#0] New Event SN=RUO239
stream-processing-fundamentals-hz-1  | 2023-02-01 21:11:44,640 [ INFO] [hz.hungry_lehmann.jet.blocking.thread-0] [c.h.j.i.c.WriteLoggerP]: []:5701 [dev] [5.2.1] [temp_monitor_161114/loggerSink#0] New Event SN=DYQ714
  1. Inspect the running job using Management Center or CLC

  2. Cancel the job. The Hazelcast cluster will remain up and events will continue to flow.

./clc-scripts/canceljob.sh Docker

+ first job

  1. Pat yourself on the back! You’ve deployed your first pipeline.

Step 5: Finish the Pipeline

Continue building the pipeline following the instructions in TemperatureMonitorPipeline.java You may want to deploy and cancel the job multiple times while you are building the pipeline. When you are done, look at the UI. You should be able to tell that your job is now controlling the machines.

job done

You can also see machine control events in the event_generator log.

docker compose logs --follow event_generator
If at any point you get stuck, you can refer to the solution which you will find in the hazelcast.platform.labs.machineshop.solutions package.

Step 6: Deploy to Hazelcast Cloud

In this step, you will deploy your temperature monitoring Pipeline to a Hazelcast Cloud cluster. You will also connect the cloud cluster to the UI, refdata loader and event_generator.

  1. If you haven’t already done so, navigate to https://cloud.hazelcast.com, create an account, and create a new "Production" cluster. This will deploy a 3 node cluster.

  2. After the cluster is deployed, go to the CLI section of the "Quick Connection Guide" as shown below. Follow steps 2 and 3 to set up CLC for your cloud cluster.


  3. The previous step should have given output similar to the following:

    rmay@HZLCST-MBP-42 stream-processing-fundamentals % clc -c pr-tgouvd9r
    Hazelcast CLC v5.3.5 (c) 2023 Hazelcast Inc.
    * Participate in our survey at: https://forms.gle/rPFywdQjvib1QCe49
    * Type 'help' for help information. Prefix non-SQL commands with \
    Configuration : /Users/me/.hazelcast/configs/pr-tgouvd9r/config.yaml
    Log      INFO : /Users/me/.hazelcast/logs/2023-11-27.log

    We will need to tell Docker where to find the configuration files for your cluster. In the above output, the configuration files for this cluster are in the $HOME/.hazelcast/configs/pr-tgouvd9r directory.

    In the shell or Windows command line where you will run your Docker commands, set the VIRIDIAN_SECRETS_DIR environment variable to the configuration directory. See below for an example. Be sure to use your Viridian cluster name.

    export VIRIDIAN_SECRETS_DIR=$HOME/.hazelcast/configs/pr-tgouvd9r
  4. Start the refdata_loader, event_generator and ui using the viridian.compose setup. This pulls the environment variables you just set and allows the local Docker instances to talk to your Viridian cluster.

    docker compose -f viridian.compose.yaml up -d
  5. View the logs.

    docker compose -f viridian.compose.yaml logs --follow
  6. Use Management Center via the Viridian console to verify that your cluster is receiving traffic.

  7. Submit your job using the same script as before. This time you will need to pass in the name of your cluster. Use clc config list to see a list of known clusters. An example is shown below.

    clc-scripts % clc config list
     Configuration Name
     pr-tgouvd9r  << Suppose you want to use this one
    ./clc-scripts/sumbit-job.sh pr-tgouvd9r
  8. Verify that the job is running using Management Center via the Viridian console. Look under Stream Processing > Jobs.

  9. We’ll use CLC to view the logs from the Viridian cluster. First, you’ll need to set up the API key and secret strings. In the Viridian console, go to Account > Developer and set the API key. Do not navigate away from this screen.

  10. In your terminal window, open the CLC command line, specifying the name of your cluster. An example of the command is below.

    clc -c pr-tgouvd9r
  11. At the CLC prompt, log in to your cluster. Enter the API key and secret when prompted.

    \viridian login
  12. At the CLC prompt, view the logs for your cluster. You will need to specify the name of your cluster. An example is shown below. The output will be similar to the output you saw at the end of Step 4 above.

    \viridian stream-logs EdSrvs


This project contains many useful helpers. Please feel free to study it and use it as a template for your own projects.