Saving the State of Jet Job with Hazelcast Platform Operator

Learn how to save the state of your data pipelines and initiate a data pipeline from an existing state using the Hazelcast Platform Operator.

Context

This tutorial will guide you through the process of saving the state of your data pipelines and creating new data pipelines initialized from the saved state. We will be following the steps below during the tutorial:

  • Deploy a Hazelcast cluster with the configured Jet engine.

  • Initiate the Jet pipeline.

  • Export a Snapshot from the Jet pipeline.

  • Start a new Jet pipeline initialized from the Snapshot.

Before you Begin

Make sure you have the following:

If you’re unsure about how to submit JetJobs using the Hazelcast Platform Operator, it’s recommended to start by following the Jet tutorial first. That tutorial also covers the steps for deploying a custom Jet pipeline JAR to Cloud storage and provides guidance on managing JetJob resources using the Hazelcast Platform Operator.

Step 1. Start the Hazelcast Cluster

Since Snapshot is an exclusive feature of the enterprise version, you’ll need to set up an Enterprise Hazelcast cluster. This is why it’s necessary to create a License Key secret first.

  1. Run the following command to create a secret with your Hazelcast Enterprise License, which will be referenced in the Hazelcast resource definition later:

    kubectl create secret generic hazelcast-license-key --from-literal=license-key=<hz-license-key>
  2. Run the following command to create a Hazelcast cluster with the Jet engine configured.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: Hazelcast
    metadata:
      name: hazelcast
    spec:
      clusterSize: 3
      licenseKeySecretName: hazelcast-license-key
      jet:
        enabled: true
        resourceUploadEnabled: true
    EOF
    If you want to explicitly specify the Hazelcast version, it’s important to note that you should use Hazelcast Enterprise version 5.3 or above for submitting JetJobs using the Hazelcast Platform Operator.
  3. Now, verify the cluster’s status to ensure that it is up and running.

    $ kubectl get hazelcast hazelcast
    NAME        STATUS    MEMBERS
    hazelcast   Running   3/3

Step 2. Run the Data Pipeline

In the tutorial, we will walk through a practical example involving a 'transaction' map designed to store crucial transaction data. Our Hazelcast Jet setup will seamlessly retrieve these transactions from the map and process them. Firstly, we will submit a Jet Job that processes transactions from the 'transaction' map. To maintain simplicity in the example, the pipeline will only log the transactions. Then we will explore the process of saving the state of the Jet Job using the Hazelcast Platform Operator.

Before submitting the Jet Job, you need to create a map from which the Jet Job will fetch the records. To utilize the map as a data source for a Jet Job, it’s necessary to enable the Event Journal in the map.

  1. Execute the following command to create the Map, which will serve as the source for the Jet Job.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: Map
    metadata:
      name: transaction
    spec:
      hazelcastResourceName: hazelcast
      eventJournal:
        capacity: 9000
    EOF
    It is important to configure the event journal in the map if it is being used as a data source for a Data Pipeline.

    You are all set to submit the Jet Job that processes transactions from the map. Below is the Java source code for the pipeline. This pipeline simply retrieves transaction entries from the map and logs them for the sake of simplicity. The code should be packaged and deployed to cloud storage. If you are unfamiliar with submitting JetJobs with the Hazelcast Platform Operator, you can also refer to Jet Tutorial.

    package org.examples.jet.snapshot;
    
    import com.hazelcast.core.Hazelcast;
    import com.hazelcast.core.HazelcastInstance;
    import com.hazelcast.jet.pipeline.JournalInitialPosition;
    import com.hazelcast.jet.pipeline.Pipeline;
    import com.hazelcast.jet.pipeline.Sinks;
    import com.hazelcast.jet.pipeline.Sources;
    
    public class JobV1 {
    
        public static void main(String[] args) {
            var p = Pipeline.create();
            var transactionSource = Sources.mapJournal("transaction", JournalInitialPosition.START_FROM_OLDEST);
            var loggerSink = Sinks.logger();
            p.readFrom(transactionSource)
                    .withIngestionTimestamps()
                    .setName("Emit Transactions")
                    .map(e -> {
                        System.out.printf("The transaction '%s' is being executed in 'job-v1'\n", e.getKey());
                        // execute the transaction
                        return String.format("[Job V1] transaction:'%s' payload:'%s'", e.getKey(), e.getValue());
                    })
                    .setName("Apply Transactions")
                    .writeTo(loggerSink)
                    .setName("Log Transactions");
    
            HazelcastInstance hz = Hazelcast.bootstrappedInstance();
            hz.getJet().newJob(p);
        }
    
    }
  2. Run the following command to submit the Jet Job.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v1
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV1
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
    EOF
  3. Run the following command to check the status of the JetJob you have submitted.

    $ kubectl get jetjob transaction-v1
    NAME             STATUS    ID                   SUBMISSIONTIME         COMPLETIONTIME
    transaction-v1   Running   741632319877545985   2023-08-09T12:22:04Z

    As new entries are added to the 'transaction' map, the Jet Job will automatically retrieve and process them. To observe the executed transactions, examine the logs. In the provided log example below, three transactions are processed with keys 'transaction-1', 'transaction-2', and 'transaction-3'. The entry values are not a concern in this context.

    $ kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v1/Log
    {"time":"2023-08-09T12:24:59,753", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#1] [Job V1] transaction:'transaction-1' payload:'{\"description\": \"Online Purchase\", \"amount\": 75.99, \"transactionDate\": \"2023-08-09T15:30:00Z\"}' "}
    {"time":"2023-08-09T12:33:32,784", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#2] [Job V1] transaction:'transaction-2' payload:'{\"description\": \"Grocery Shopping\", \"amount\": 42.75, \"transactionDate\": \"2023-08-10T10:15:00Z\"}' "}
    {"time":"2023-08-09T12:33:44,997", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#0] [Job V1] transaction:'transaction-3' payload:'{\"description\": \"Restaurant Dinner\", \"amount\": 120.50, \"transactionDate\": \"2023-08-11T20:00:00Z\"}' "}

Step 3. Save the state of the Data Pipeline

In data pipelines, saving and using computation process states is vital for accurate and reliable data processing. Jet’s Snapshot feature lets you save and restore these processing states. A snapshot captures the state of a running Jet job at a specific time, giving you a reliable record of ongoing computations and processed data.

  1. Run the following command to export a Snapshot from the Jet Job.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJobSnapshot
    metadata:
      name: snapshot-transaction
    spec:
      jetJobResourceName: transaction-v1
      cancelJob: true
    EOF
  2. Run the following command to check the status of the exported JetJobSnapshot.

    $ kubectl get jetjobsnapshot transaction
    NAME          STATE      CREATIONTIME
    transaction   Exported   2023-08-09T13:07:51Z
    By configuring the 'spec.cancelJob' field to 'true', the Jet Job named 'transaction-v1' will be canceled after applying the JetJobSnapshot. This setting is particularly useful before submitting a new version of the existing data pipeline. With this approach, the snapshot will halt the ongoing job after preserving its current state.
  3. The Jet Job should not be in the Running state anymore. You can verify this by using the following command.

    $ kubectl get jetjob transaction-v1
    NAME             STATUS            ID                   SUBMISSIONTIME         COMPLETIONTIME
    transaction-v1   ExecutionFailed   741632319877545985   2023-08-09T12:22:04Z   2023-08-09T13:07:51Z

Step 4. Submit Job initialized from Snapshot

When creating a new version of a data pipeline, it’s essential to initialize the new pipeline from the current state of the old one. Without this initialization, the new pipeline would start with an empty state and lack information about its predecessor’s state. This situation could result in data loss or duplicate processing, which is not desirable, particularly for critical pipelines. To ensure proper initialization and prevent these issues, we can rely on the Snapshot.

  1. Continuing with the example, we will now move forward to create a new version of the previous Jet Job. To maintain simplicity in the example, the new version so similar to the old one. It takes entries from the 'transaction' map and logs them.

    package org.examples.jet.snapshot;
    
    import com.hazelcast.core.Hazelcast;
    import com.hazelcast.core.HazelcastInstance;
    import com.hazelcast.jet.pipeline.JournalInitialPosition;
    import com.hazelcast.jet.pipeline.Pipeline;
    import com.hazelcast.jet.pipeline.Sinks;
    import com.hazelcast.jet.pipeline.Sources;
    
    public class JobV2 {
    
        public static void main(String[] args) {
            var p = Pipeline.create();
            var transactionSource = Sources.mapJournal("transaction", JournalInitialPosition.START_FROM_OLDEST);
            var loggerSink = Sinks.logger();
            p.readFrom(transactionSource)
                    .withIngestionTimestamps()
                    .setName("Emit Transactions")
                    .map(e -> {
                        System.out.printf("The transaction '%s' is being executed in 'job-v2'\n", e.getKey());
                        // execute the transaction
                        return String.format("[Job V2] transaction:'%s' payload:'%s'", e.getKey(), e.getValue());
                    })
                    .setName("Apply Transactions")
                    .writeTo(loggerSink)
                    .setName("Log Transactions");
    
            HazelcastInstance hz = Hazelcast.bootstrappedInstance();
            hz.getJet().newJob(p);
        }
    
    }
  2. Differing from the previous JetJob definition, we will set the 'initialSnapshotResourceName' field to refer to the Snapshot exported in the preceding step. Execute the following command to submit the new Jet Job.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v2
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV2
      initialSnapshotResourceName: snapshot-transaction
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
    EOF

    This Jet Job, named 'transaction-v2', will seamlessly resume processing entries from the state at which we exported the snapshot. In this way, we achieve to process each transaction entries only once within the pipeline.

  3. Now check the new Jet job status

    $ kubectl get jetjob transaction-v2
    NAME             STATUS    ID                   SUBMISSIONTIME         COMPLETIONTIME
    transaction-v2   Running   741650518446702593   2023-08-09T13:34:22Z

    When you review the logs after submitting the pipeline, you will see the logs of only the transaction entries which are put after the time when we exported the Snapshot. Which means the new version of the transaction pipeline, named 'transaction-v2', won’t executes the transactions which are already executed by the first version of the pipeline named 'transaction-v1'.

    $ kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v2/Log
    {"time":"2023-08-09T12:45:11,364", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#1] [Job V2] transaction:'transaction-4' payload:'{\"description\": \"Movie Tickets\", \"amount\": 25.00, \"transactionDate\": \"2023-08-12T18:45:00Z\"}' "}
    {"time":"2023-08-09T12:47:53,791", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#0] [Job V2] transaction:'transaction-5' payload:'{\"description\": \"Gasoline Refill\", \"amount\": 50.30, \"transactionDate\": \"2023-08-13T09:00:00Z\"}' "}

    If the new version of the Jet Job, named 'transaction-v2', is not initialized from the exported Snapshot 'transaction', it will begin data processing from the start of the map. This is undesirable, as we have already processed certain transaction entries in the previous version of the pipeline.

    If want to make your Snapshots to be persistent against outages or restarts, it would be enough to create a Hazelcast cluster with persistence enabled.

Step 5. Cleaning Up

To clean up all the resources you created during the tutorial, and to remove the custom resources and secrets, run the following command:

kubectl delete $(kubectl get hazelcast -o name)
kubectl delete secret hazelcast-license-key
Deleting the Hazelcast CR will also delete the Jet Jobs and Jet Job Snapshots linked to it.

Summary

Saving the current state of your data pipeline and initializing new pipelines from that snapshot could be essential in same cases as demonstrated the example in the tutorial. We have covered the process of managing the state of your data pipelines using the Hazelcast Platform Operator.