This is a prerelease version.

View latest

Tutorial: Save the state of a Jet job

Learn how to save the state of your data pipelines and initiate a job from a snapshot using Operator.

Overview

This tutorial guides you through the process of saving the state of your data pipelines and creating new data pipelines initialized from the saved state. In this tutorial, you’ll:

  • Deploy a Hazelcast cluster with the Jet engine configured.

  • Initiate a pipeline as a Jet job.

  • Export a snapshot from the Jet job.

  • Start a new Jet job initialized from the Snapshot.

The tutorial should take approximately 10 minutes to complete.

Prerequisites

Before you begin, make sure that you have:

If you’re unsure about how to submit Jet jobs using Operator, follow the Jet tutorial first. This also covers the steps for deploying a custom Jet pipeline JAR to Cloud storage and provides guidance on managing Jet job resources using Operator.

Start the Hazelcast cluster

  1. Create a secret with your Hazelcast Enterprise license, which will be referenced in the Hazelcast resource definition later:

    kubectl create secret generic hazelcast-license-key --from-literal=license-key=<hz-license-key>
  2. Create a Hazelcast cluster with the Jet engine configured.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: Hazelcast
    metadata:
      name: hazelcast
    spec:
      clusterSize: 3
      licenseKeySecretName: hazelcast-license-key
      jet:
        enabled: true
        resourceUploadEnabled: true
    EOF
    If you want to specify the Hazelcast version explicitly, it’s important to note that you should use Hazelcast Enterprise version 5.3 or above for submitting Jet jobs using Operator.
  3. Verify the cluster’s status to ensure that it is up and running.

    kubectl get hazelcast hazelcast
    NAME        STATUS    MEMBERS
    hazelcast   Running   3/3

Run the data pipeline

In the tutorial, we will walk through a practical example involving a 'transaction' map designed to store crucial transaction data. Our Hazelcast Jet setup will seamlessly retrieve these transactions from the map and process them. First, we’ll submit a Jet job that processes transactions from the 'transaction' map. To maintain simplicity in the example, the pipeline will only log the transactions. We’ll then explore the process of saving the state of the Jet job using Operator.

Before submitting the Jet job, you need to create a map from which the Jet job will fetch the records. To utilize the map as a data source for a Jet job, we need to enable the event journal in the map.

  1. Create the Map, which will serve as the source for the Jet job.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: Map
    metadata:
      name: transaction
    spec:
      hazelcastResourceName: hazelcast
      eventJournal:
        capacity: 9000
    EOF
  2. Now you can submit the Jet job that processes transactions from the map. This pipeline retrieves transaction entries from the map and logs them. The code should be packaged and deployed to cloud storage. If you are unfamiliar with submitting jobs with Operator, refer to the Jet tutorial.

    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v1
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV1
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
  3. Submit the Jet job.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v1
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV1
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
    EOF
  4. Check the status of the Jet job you have submitted.

    kubectl get jetjob transaction-v1
    NAME             STATUS    ID                   SUBMISSIONTIME         COMPLETIONTIME
    transaction-v1   Running   741632319877545985   2023-08-09T12:22:04Z

    As new entries are added to the 'transaction' map, the Jet job automatically retrieves and processes them. To see the executed transactions, check the logs. In the example below, three transactions are processed with keys 'transaction-1', 'transaction-2', and 'transaction-3'.

    kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v1/Log
    {"time":"2023-08-09T12:24:59,753", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#1] [Job V1] transaction:'transaction-1' payload:'{\"description\": \"Online Purchase\", \"amount\": 75.99, \"transactionDate\": \"2023-08-09T15:30:00Z\"}' "}
    {"time":"2023-08-09T12:33:32,784", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#2] [Job V1] transaction:'transaction-2' payload:'{\"description\": \"Grocery Shopping\", \"amount\": 42.75, \"transactionDate\": \"2023-08-10T10:15:00Z\"}' "}
    {"time":"2023-08-09T12:33:44,997", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#0] [Job V1] transaction:'transaction-3' payload:'{\"description\": \"Restaurant Dinner\", \"amount\": 120.50, \"transactionDate\": \"2023-08-11T20:00:00Z\"}' "}

Save the state of the data pipeline

In data pipelines, saving and using computation process state is vital for accurate and reliable data processing. Jet’s snapshot feature lets you save and restore this states. A snapshot captures the state of a running Jet job at a specific time, giving you a reliable record of ongoing computations and processed data.

  1. Export a Snapshot from the Jet job.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJobSnapshot
    metadata:
      name: snapshot-transaction
    spec:
      jetJobResourceName: transaction-v1
      cancelJob: true
    EOF
  2. Check the status of the exported JetJobSnapshot.

    kubectl get jetjobsnapshot transaction
    NAME          STATE      CREATIONTIME
    transaction   Exported   2023-08-09T13:07:51Z
    By configuring the 'spec.cancelJob' field to 'true', the Jet job named 'transaction-v1' is canceled after taking the snapshot. This setting is particularly useful before submitting a new version of an existing pipeline.
  3. The Jet job should not be in the Running state anymore. Use the following command to verify this.

    kubectl get jetjob transaction-v1
    NAME             STATUS            ID                   SUBMISSIONTIME         COMPLETIONTIME
    transaction-v1   ExecutionFailed   741632319877545985   2023-08-09T12:22:04Z   2023-08-09T13:07:51Z

Submit a job from a snapshot

When creating a new version of a data pipeline, it’s essential to initialize the new pipeline from the latest state of the old one. Without this initialization, the new pipeline would start with no state and lack information about its predecessor’s state. This could result in data loss or duplicate processing, which is not desirable, particularly for critical pipelines.

  1. Continuing with the example, we will now create a new version of the previous Jet job. For simplicity, the new version is similar to the old one: it takes entries from the 'transaction' map and logs them.

    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v2
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV2
      initialSnapshotResourceName: snapshot-transaction
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
  2. In contrast to the previous Jet job definition, we will set the 'initialSnapshotResourceName' field to refer to the snapshot exported in the preceding step. Submit the new Jet job.

    kubectl apply -f - <<EOF
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: transaction-v2
    spec:
      hazelcastResourceName: hazelcast
      state: Running
      jarName: jet-pipelines-1.0-SNAPSHOT.jar
      mainClass: org.examples.jet.snapshot.JobV2
      initialSnapshotResourceName: snapshot-transaction
      bucketConfig:
        bucketURI: '<BUCKET-URI>'
        secretName: '<SECRET-NAME>'
    EOF

    This Jet job, named 'transaction-v2', will seamlessly resume processing entries from the state we exported in the snapshot. This means transaction entries are processed only once within the pipeline.

  3. Check the new Jet job status is Running.

    kubectl get jetjob transaction-v2
    NAME             STATUS    ID                   SUBMISSIONTIME         COMPLETIONTIME
    transaction-v2   Running   741650518446702593   2023-08-09T13:34:22Z

    When you review the logs after submitting the pipeline, you will only see logs of transaction entries added after the time when we exported the snapshot. This means that the new version of the transaction pipeline, named 'transaction-v2', won’t execute transactions already processed by the first version of the pipeline named 'transaction-v1'.

    kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v2/Log
    {"time":"2023-08-09T12:45:11,364", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#1] [Job V2] transaction:'transaction-4' payload:'{\"description\": \"Movie Tickets\", \"amount\": 25.00, \"transactionDate\": \"2023-08-12T18:45:00Z\"}' "}
    {"time":"2023-08-09T12:47:53,791", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#0] [Job V2] transaction:'transaction-5' payload:'{\"description\": \"Gasoline Refill\", \"amount\": 50.30, \"transactionDate\": \"2023-08-13T09:00:00Z\"}' "}

    If the new version of the Jet job is not initialized from the exported snapshot 'transaction', it will begin data processing from the start of the map. This is undesirable because we have already processed some transaction entries in the previous version of the pipeline.

    If want to make your snapshots persistent through outages or restarts, create a Hazelcast cluster with persistence enabled.

Clean up

To clean up all the resources you created during this tutorial, and to remove the custom resources and secrets, run the following command:

kubectl delete $(kubectl get hazelcast -o name)
kubectl delete secret hazelcast-license-key
Deleting the Hazelcast CR also deletes the Jet jobs and Jet job snapshots linked to it.

Summary

We’ve seen how saving the current state of your data pipeline and initializing new pipelines from that snapshot can be essential and covered the process of managing the state of your data pipelines using Operator.