Save the state of a Jet job with Platform Operator

Learn how to save the state of your data pipelines and initiate a data pipeline from an existing state using the Hazelcast Platform Operator.

Overview

This tutorial guides you through the process of saving the state of your data pipelines and creating new data pipelines initialized from the saved state. Follow the steps below:

  • Deploy a Hazelcast cluster with the configured Jet engine

  • Initiate the Jet pipeline

  • Export a Snapshot from the Jet pipeline

  • Start a new Jet pipeline initialized from the Snapshot

The tutorial should take approximately 10 minutes to complete.

Prerequisites

Before you begin, make sure that you have:

If you’re unsure about how to submit Jet jobs using the Hazelcast Platform Operator, follow the Jet tutorial first. This also covers the steps for deploying a custom Jet pipeline JAR to Cloud storage and provides guidance on managing Jet job resources using the Hazelcast Platform Operator.

Start the Hazelcast cluster

Since Snapshot is an exclusive feature of the enterprise version, you’ll need to set up an Enterprise Hazelcast cluster. This is why it’s necessary to create a License Key secret first.

  1. Create a secret with your Hazelcast Enterprise License, which will be referenced in the Hazelcast resource definition later:

    kubectl create secret generic hazelcast-license-key --from-literal=license-key=<hz-license-key>
  2. Create a Hazelcast cluster with the Jet engine configured.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: Hazelcast
    metadata:
      name: hazelcast
    spec:
      clusterSize: 3
      licenseKeySecretName: hazelcast-license-key
      jet:
        enabled: true
        resourceUploadEnabled: true
    EOF
    If you want to specify the Hazelcast version explicitly, it’s important to note that you should use Hazelcast Enterprise version 5.3 or above for submitting Jet jobs using the Hazelcast Platform Operator.
  3. Now, verify the cluster’s status to ensure that it is up and running.

    $ kubectl get hazelcast hazelcast
    NAME        STATUS    MEMBERS
    hazelcast   Running   3/3

Run the data pipeline

In the tutorial, we will walk through a practical example involving a 'transaction' map designed to store crucial transaction data. Our Hazelcast Jet setup will seamlessly retrieve these transactions from the map and process them. Firstly, we will submit a Jet job that processes transactions from the 'transaction' map. To maintain simplicity in the example, the pipeline will only log the transactions. Then we’ll explore the process of saving the state of the Jet job using the Hazelcast Platform Operator.

Before submitting the Jet job, you need to create a map from which the Jet job will fetch the records. To utilize the map as a data source for a Jet job, it’s necessary to enable the Event Journal in the map.

  1. Execute the following command to create the Map, which will serve as the source for the Jet job.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: Map
    metadata:
      name: transaction
    spec:
      hazelcastResourceName: hazelcast
      eventJournal:
        capacity: 9000
    EOF
    It is important to configure the event journal in the map if it is being used as a data source for a Data Pipeline.

    You are all set to submit the Jet job that processes transactions from the map. Below is the Java source code for the pipeline. This pipeline simply retrieves transaction entries from the map and logs them for the sake of simplicity. The code should be packaged and deployed to cloud storage. If you are unfamiliar with submitting jobs with the Hazelcast Platform Operator, refer to the Jet tutorial.

    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v1
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV1
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
  2. Submit the Jet job:

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v1
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV1
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
    EOF
  3. Check the status of the Jet job you have submitted:

    $ kubectl get jetjob transaction-v1
    NAME             STATUS    ID                   SUBMISSIONTIME         COMPLETIONTIME
    transaction-v1   Running   741632319877545985   2023-08-09T12:22:04Z

    As new entries are added to the 'transaction' map, the Jet job automatically retrieves and processes them. To observe the executed transactions, examine the logs. In the log example below, three transactions are processed with keys 'transaction-1', 'transaction-2', and 'transaction-3'. The entry values are not a concern in this context.

    $ kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v1/Log
    {"time":"2023-08-09T12:24:59,753", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#1] [Job V1] transaction:'transaction-1' payload:'{\"description\": \"Online Purchase\", \"amount\": 75.99, \"transactionDate\": \"2023-08-09T15:30:00Z\"}' "}
    {"time":"2023-08-09T12:33:32,784", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#2] [Job V1] transaction:'transaction-2' payload:'{\"description\": \"Grocery Shopping\", \"amount\": 42.75, \"transactionDate\": \"2023-08-10T10:15:00Z\"}' "}
    {"time":"2023-08-09T12:33:44,997", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#0] [Job V1] transaction:'transaction-3' payload:'{\"description\": \"Restaurant Dinner\", \"amount\": 120.50, \"transactionDate\": \"2023-08-11T20:00:00Z\"}' "}

Save the state of the data pipeline

In data pipelines, saving and using computation process states is vital for accurate and reliable data processing. Jet’s Snapshot feature lets you save and restore these processing states. A snapshot captures the state of a running Jet job at a specific time, giving you a reliable record of ongoing computations and processed data.

  1. Export a Snapshot from the Jet job:

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJobSnapshot
    metadata:
      name: snapshot-transaction
    spec:
      jetJobResourceName: transaction-v1
      cancelJob: true
    EOF
  2. Check the status of the exported JetJobSnapshot:

    $ kubectl get jetjobsnapshot transaction
    NAME          STATE      CREATIONTIME
    transaction   Exported   2023-08-09T13:07:51Z
    By configuring the 'spec.cancelJob' field to 'true', the Jet job named 'transaction-v1' is canceled after applying the JetJobSnapshot. This setting is particularly useful before submitting a new version of the existing data pipeline. With this approach, the snapshot will halt the ongoing job after preserving its current state.
  3. The Jet job should not be in the Running state anymore. Use the following command to verify this:

    $ kubectl get jetjob transaction-v1
    NAME             STATUS            ID                   SUBMISSIONTIME         COMPLETIONTIME
    transaction-v1   ExecutionFailed   741632319877545985   2023-08-09T12:22:04Z   2023-08-09T13:07:51Z

Submit job initialized from Snapshot

When creating a new version of a data pipeline, it’s essential to initialize the new pipeline from the current state of the old one. Without this initialization, the new pipeline would start with an empty state and lack information about its predecessor’s state. This situation could result in data loss or duplicate processing, which is not desirable, particularly for critical pipelines. To ensure proper initialization and prevent these issues, we can rely on the Snapshot.

  1. Continuing with the example, we will now create a new version of the previous Jet job. For simplicity, the new version is similar to the old one. It takes entries from the 'transaction' map and logs them.

    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v2
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV2
      initialSnapshotResourceName: snapshot-transaction
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
  2. In contrast to the previous Jet job definition, we will set the 'initialSnapshotResourceName' field to refer to the Snapshot exported in the preceding step. Execute the following command to submit the new Jet job:

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v2
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV2
      initialSnapshotResourceName: snapshot-transaction
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
    EOF

    This Jet job, named 'transaction-v2', will seamlessly resume processing entries from the state at which we exported the snapshot. In this way, transaction entries are processed only once within the pipeline.

  3. Now check the new Jet job status

    $ kubectl get jetjob transaction-v2
    NAME             STATUS    ID                   SUBMISSIONTIME         COMPLETIONTIME
    transaction-v2   Running   741650518446702593   2023-08-09T13:34:22Z

    When you review the logs after submitting the pipeline, you will see only logs of transaction entries added after the time when we exported the Snapshot. This means that the new version of the transaction pipeline, named 'transaction-v2', won’t execute transactions already processed by the first version of the pipeline named 'transaction-v1'.

    $ kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v2/Log
    {"time":"2023-08-09T12:45:11,364", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#1] [Job V2] transaction:'transaction-4' payload:'{\"description\": \"Movie Tickets\", \"amount\": 25.00, \"transactionDate\": \"2023-08-12T18:45:00Z\"}' "}
    {"time":"2023-08-09T12:47:53,791", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#0] [Job V2] transaction:'transaction-5' payload:'{\"description\": \"Gasoline Refill\", \"amount\": 50.30, \"transactionDate\": \"2023-08-13T09:00:00Z\"}' "}

    If the new version of the Jet job, named 'transaction-v2', is not initialized from the exported Snapshot 'transaction', it will begin data processing from the start of the map. This is undesirable, as we have already processed certain transaction entries in the previous version of the pipeline.

    If want to make your Snapshots persistent against outages or restarts, create a Hazelcast cluster with persistence enabled.

Clean up

To clean up all the resources you created during the tutorial, and to remove the custom resources and secrets, run the following command:

kubectl delete $(kubectl get hazelcast -o name)
kubectl delete secret hazelcast-license-key
Deleting the Hazelcast CR also deletes the Jet jobs and Jet job Snapshots linked to it.

Summary

We’ve seen how saving the current state of your data pipeline and initializing new pipelines from that snapshot could be essential in some cases. We’ve covered the process of managing the state of your data pipelines using the Hazelcast Platform Operator.