Learn how to save the state of your data pipelines and initiate a job from a snapshot using Operator.
Overview
This tutorial guides you through the process of saving the state of your data pipelines and creating new data pipelines initialized from the saved state. In this tutorial, you’ll:
-
Deploy a Hazelcast cluster with the Jet engine configured.
-
Initiate a pipeline as a Jet job.
-
Export a snapshot from the Jet job.
-
Start a new Jet job initialized from the Snapshot.
The tutorial should take approximately 10 minutes to complete.
Prerequisites
Before you begin, make sure that you have:
-
A running Kubernetes cluster.
-
The Kubernetes command line tool, kubectl.
-
The Maven command line tool, mvn.
-
Deployed Hazelcast Platform Operator.
-
The Hazelcast Code Samples repository https://github.com/hazelcast/hazelcast-code-samples cloned to your local machine.
-
Blob storage and access credentials on one of the following cloud providers: AWS, GCP, Azure.
If you’re unsure about how to submit Jet jobs using Operator, follow the Jet tutorial first. This also covers the steps for deploying a custom Jet pipeline JAR to Cloud storage and provides guidance on managing Jet job resources using Operator.
Start the Hazelcast cluster
-
Create a secret with your Hazelcast Enterprise license, which will be referenced in the Hazelcast resource definition later:
kubectl create secret generic hazelcast-license-key --from-literal=license-key=<hz-license-key> -
Create a Hazelcast cluster with the Jet engine configured.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: Hazelcast metadata: name: hazelcast spec: clusterSize: 3 licenseKeySecretName: hazelcast-license-key jet: enabled: true resourceUploadEnabled: true EOFIf you want to specify the Hazelcast version explicitly, it’s important to note that you should use Hazelcast Enterprise version 5.3 or above for submitting Jet jobs using Operator. -
Verify the cluster’s status to ensure that it is up and running.
kubectl get hazelcast hazelcast NAME STATUS MEMBERS hazelcast Running 3/3
Run the data pipeline
In the tutorial, we will walk through a practical example involving a 'transaction' map designed to store crucial transaction data. Our Hazelcast Jet setup will seamlessly retrieve these transactions from the map and process them. First, we’ll submit a Jet job that processes transactions from the 'transaction' map. To maintain simplicity in the example, the pipeline will only log the transactions. We’ll then explore the process of saving the state of the Jet job using Operator.
Before submitting the Jet job, you need to create a map from which the Jet job will fetch the records. To utilize the map as a data source for a Jet job, we need to enable the event journal in the map.
-
Create the Map, which will serve as the source for the Jet job.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: Map metadata: name: transaction spec: hazelcastResourceName: hazelcast eventJournal: capacity: 9000 EOF -
Now you can submit the Jet job that processes transactions from the map. This pipeline retrieves transaction entries from the map and logs them. The code should be packaged and deployed to cloud storage. If you are unfamiliar with submitting jobs with Operator, refer to the Jet tutorial.
apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: transaction-v1 spec: hazelcastResourceName: hazelcast state: Running jarName: jet-pipelines-1.0-SNAPSHOT.jar mainClass: org.examples.jet.snapshot.JobV1 bucketConfig: bucketURI: '<BUCKET-URI>' secretName: '<SECRET-NAME>' -
Submit the Jet job.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: transaction-v1 spec: hazelcastResourceName: hazelcast state: Running jarName: jet-pipelines-1.0-SNAPSHOT.jar mainClass: org.examples.jet.snapshot.JobV1 bucketConfig: bucketURI: '<BUCKET-URI>' secretName: '<SECRET-NAME>' EOF -
Check the status of the Jet job you have submitted.
kubectl get jetjob transaction-v1 NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME transaction-v1 Running 741632319877545985 2023-08-09T12:22:04ZAs new entries are added to the 'transaction' map, the Jet job automatically retrieves and processes them. To see the executed transactions, check the logs. In the example below, three transactions are processed with keys 'transaction-1', 'transaction-2', and 'transaction-3'.
kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v1/Log {"time":"2023-08-09T12:24:59,753", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#1] [Job V1] transaction:'transaction-1' payload:'{\"description\": \"Online Purchase\", \"amount\": 75.99, \"transactionDate\": \"2023-08-09T15:30:00Z\"}' "} {"time":"2023-08-09T12:33:32,784", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#2] [Job V1] transaction:'transaction-2' payload:'{\"description\": \"Grocery Shopping\", \"amount\": 42.75, \"transactionDate\": \"2023-08-10T10:15:00Z\"}' "} {"time":"2023-08-09T12:33:44,997", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#0] [Job V1] transaction:'transaction-3' payload:'{\"description\": \"Restaurant Dinner\", \"amount\": 120.50, \"transactionDate\": \"2023-08-11T20:00:00Z\"}' "}
Save the state of the data pipeline
In data pipelines, saving and using computation process state is vital for accurate and reliable data processing. Jet’s snapshot feature lets you save and restore this states. A snapshot captures the state of a running Jet job at a specific time, giving you a reliable record of ongoing computations and processed data.
-
Export a Snapshot from the Jet job.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJobSnapshot metadata: name: snapshot-transaction spec: jetJobResourceName: transaction-v1 cancelJob: true EOF -
Check the status of the exported JetJobSnapshot.
kubectl get jetjobsnapshot transaction NAME STATE CREATIONTIME transaction Exported 2023-08-09T13:07:51ZBy configuring the 'spec.cancelJob' field to 'true', the Jet job named 'transaction-v1' is canceled after taking the snapshot. This setting is particularly useful before submitting a new version of an existing pipeline. -
The Jet job should not be in the
Runningstate anymore. Use the following command to verify this.kubectl get jetjob transaction-v1 NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME transaction-v1 ExecutionFailed 741632319877545985 2023-08-09T12:22:04Z 2023-08-09T13:07:51Z
Submit a job from a snapshot
When creating a new version of a data pipeline, it’s essential to initialize the new pipeline from the latest state of the old one. Without this initialization, the new pipeline would start with no state and lack information about its predecessor’s state. This could result in data loss or duplicate processing, which is not desirable, particularly for critical pipelines.
-
Continuing with the example, we will now create a new version of the previous Jet job. For simplicity, the new version is similar to the old one: it takes entries from the 'transaction' map and logs them.
apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: transaction-v2 spec: hazelcastResourceName: hazelcast state: Running jarName: jet-pipelines-1.0-SNAPSHOT.jar mainClass: org.examples.jet.snapshot.JobV2 initialSnapshotResourceName: snapshot-transaction bucketConfig: bucketURI: '<BUCKET-URI>' secretName: '<SECRET-NAME>' -
In contrast to the previous Jet job definition, we will set the 'initialSnapshotResourceName' field to refer to the snapshot exported in the preceding step. Submit the new Jet job.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: transaction-v2 spec: hazelcastResourceName: hazelcast state: Running jarName: jet-pipelines-1.0-SNAPSHOT.jar mainClass: org.examples.jet.snapshot.JobV2 initialSnapshotResourceName: snapshot-transaction bucketConfig: bucketURI: '<BUCKET-URI>' secretName: '<SECRET-NAME>' EOFThis Jet job, named 'transaction-v2', will seamlessly resume processing entries from the state we exported in the snapshot. This means transaction entries are processed only once within the pipeline.
-
Check the new Jet job status is
Running.kubectl get jetjob transaction-v2 NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME transaction-v2 Running 741650518446702593 2023-08-09T13:34:22ZWhen you review the logs after submitting the pipeline, you will only see logs of transaction entries added after the time when we exported the snapshot. This means that the new version of the transaction pipeline, named 'transaction-v2', won’t execute transactions already processed by the first version of the pipeline named 'transaction-v1'.
kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v2/Log {"time":"2023-08-09T12:45:11,364", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#1] [Job V2] transaction:'transaction-4' payload:'{\"description\": \"Movie Tickets\", \"amount\": 25.00, \"transactionDate\": \"2023-08-12T18:45:00Z\"}' "} {"time":"2023-08-09T12:47:53,791", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#0] [Job V2] transaction:'transaction-5' payload:'{\"description\": \"Gasoline Refill\", \"amount\": 50.30, \"transactionDate\": \"2023-08-13T09:00:00Z\"}' "}If the new version of the Jet job is not initialized from the exported snapshot 'transaction', it will begin data processing from the start of the map. This is undesirable because we have already processed some transaction entries in the previous version of the pipeline.
If want to make your snapshots persistent through outages or restarts, create a Hazelcast cluster with persistence enabled.
Clean up
To clean up all the resources you created during this tutorial, and to remove the custom resources and secrets, run the following command:
kubectl delete $(kubectl get hazelcast -o name)
kubectl delete secret hazelcast-license-key
| Deleting the Hazelcast CR also deletes the Jet jobs and Jet job snapshots linked to it. |
Summary
We’ve seen how saving the current state of your data pipeline and initializing new pipelines from that snapshot can be essential and covered the process of managing the state of your data pipelines using Operator.