Saving the State of Jet Job with Hazelcast Platform Operator
Learn how to save the state of your data pipelines and initiate a data pipeline from an existing state using the Hazelcast Platform Operator.
Context
This tutorial will guide you through the process of saving the state of your data pipelines and creating new data pipelines initialized from the saved state. We will be following the steps below during the tutorial:
-
Deploy a Hazelcast cluster with the configured Jet engine.
-
Initiate the Jet pipeline.
-
Export a Snapshot from the Jet pipeline.
-
Start a new Jet pipeline initialized from the Snapshot.
Before you Begin
Make sure you have the following:
If you’re unsure about how to submit JetJobs using the Hazelcast Platform Operator, it’s recommended to start by following the Jet tutorial first. That tutorial also covers the steps for deploying a custom Jet pipeline JAR to Cloud storage and provides guidance on managing JetJob resources using the Hazelcast Platform Operator.
Step 1. Start the Hazelcast Cluster
Since Snapshot is an exclusive feature of the enterprise version, you’ll need to set up an Enterprise Hazelcast cluster. This is why it’s necessary to create a License Key secret first.
-
Run the following command to create a secret with your Hazelcast Enterprise License, which will be referenced in the Hazelcast resource definition later:
kubectl create secret generic hazelcast-license-key --from-literal=license-key=<hz-license-key>
-
Run the following command to create a Hazelcast cluster with the Jet engine configured.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: Hazelcast metadata: name: hazelcast spec: clusterSize: 3 licenseKeySecretName: hazelcast-license-key jet: enabled: true resourceUploadEnabled: true EOF
If you want to explicitly specify the Hazelcast version, it’s important to note that you should use Hazelcast Enterprise version 5.3 or above for submitting JetJobs using the Hazelcast Platform Operator. -
Now, verify the cluster’s status to ensure that it is up and running.
$ kubectl get hazelcast hazelcast NAME STATUS MEMBERS hazelcast Running 3/3
Step 2. Run the Data Pipeline
In the tutorial, we will walk through a practical example involving a 'transaction' map designed to store crucial transaction data. Our Hazelcast Jet setup will seamlessly retrieve these transactions from the map and process them. Firstly, we will submit a Jet Job that processes transactions from the 'transaction' map. To maintain simplicity in the example, the pipeline will only log the transactions. Then we will explore the process of saving the state of the Jet Job using the Hazelcast Platform Operator.
Before submitting the Jet Job, you need to create a map from which the Jet Job will fetch the records. To utilize the map as a data source for a Jet Job, it’s necessary to enable the Event Journal in the map.
-
Execute the following command to create the Map, which will serve as the source for the Jet Job.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: Map metadata: name: transaction spec: hazelcastResourceName: hazelcast eventJournal: capacity: 9000 EOF
It is important to configure the event journal in the map if it is being used as a data source for a Data Pipeline. You are all set to submit the Jet Job that processes transactions from the map. Below is the Java source code for the pipeline. This pipeline simply retrieves transaction entries from the map and logs them for the sake of simplicity. The code should be packaged and deployed to cloud storage. If you are unfamiliar with submitting JetJobs with the Hazelcast Platform Operator, you can also refer to Jet Tutorial.
package org.examples.jet.snapshot; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.jet.pipeline.JournalInitialPosition; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.Sources; public class JobV1 { public static void main(String[] args) { var p = Pipeline.create(); var transactionSource = Sources.mapJournal("transaction", JournalInitialPosition.START_FROM_OLDEST); var loggerSink = Sinks.logger(); p.readFrom(transactionSource) .withIngestionTimestamps() .setName("Emit Transactions") .map(e -> { System.out.printf("The transaction '%s' is being executed in 'job-v1'\n", e.getKey()); // execute the transaction return String.format("[Job V1] transaction:'%s' payload:'%s'", e.getKey(), e.getValue()); }) .setName("Apply Transactions") .writeTo(loggerSink) .setName("Log Transactions"); HazelcastInstance hz = Hazelcast.bootstrappedInstance(); hz.getJet().newJob(p); } }
-
Run the following command to submit the Jet Job.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: transaction-v1 spec: hazelcastResourceName: hazelcast state: Running jarName: jet-pipelines-1.0-SNAPSHOT.jar mainClass: org.examples.jet.snapshot.JobV1 bucketConfig: bucketURI: '<BUCKET-URI>' secretName: '<SECRET-NAME>' EOF
-
Run the following command to check the status of the JetJob you have submitted.
$ kubectl get jetjob transaction-v1 NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME transaction-v1 Running 741632319877545985 2023-08-09T12:22:04Z
As new entries are added to the 'transaction' map, the Jet Job will automatically retrieve and process them. To observe the executed transactions, examine the logs. In the provided log example below, three transactions are processed with keys 'transaction-1', 'transaction-2', and 'transaction-3'. The entry values are not a concern in this context.
$ kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v1/Log {"time":"2023-08-09T12:24:59,753", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#1] [Job V1] transaction:'transaction-1' payload:'{\"description\": \"Online Purchase\", \"amount\": 75.99, \"transactionDate\": \"2023-08-09T15:30:00Z\"}' "} {"time":"2023-08-09T12:33:32,784", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#2] [Job V1] transaction:'transaction-2' payload:'{\"description\": \"Grocery Shopping\", \"amount\": 42.75, \"transactionDate\": \"2023-08-10T10:15:00Z\"}' "} {"time":"2023-08-09T12:33:44,997", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v1/Log Transactions#0] [Job V1] transaction:'transaction-3' payload:'{\"description\": \"Restaurant Dinner\", \"amount\": 120.50, \"transactionDate\": \"2023-08-11T20:00:00Z\"}' "}
Step 3. Save the state of the Data Pipeline
In data pipelines, saving and using computation process states is vital for accurate and reliable data processing. Jet’s Snapshot feature lets you save and restore these processing states. A snapshot captures the state of a running Jet job at a specific time, giving you a reliable record of ongoing computations and processed data.
-
Run the following command to export a Snapshot from the Jet Job.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJobSnapshot metadata: name: snapshot-transaction spec: jetJobResourceName: transaction-v1 cancelJob: true EOF
-
Run the following command to check the status of the exported JetJobSnapshot.
$ kubectl get jetjobsnapshot transaction NAME STATE CREATIONTIME transaction Exported 2023-08-09T13:07:51Z
By configuring the 'spec.cancelJob' field to 'true', the Jet Job named 'transaction-v1' will be canceled after applying the JetJobSnapshot. This setting is particularly useful before submitting a new version of the existing data pipeline. With this approach, the snapshot will halt the ongoing job after preserving its current state. -
The Jet Job should not be in the Running state anymore. You can verify this by using the following command.
$ kubectl get jetjob transaction-v1 NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME transaction-v1 ExecutionFailed 741632319877545985 2023-08-09T12:22:04Z 2023-08-09T13:07:51Z
Step 4. Submit Job initialized from Snapshot
When creating a new version of a data pipeline, it’s essential to initialize the new pipeline from the current state of the old one. Without this initialization, the new pipeline would start with an empty state and lack information about its predecessor’s state. This situation could result in data loss or duplicate processing, which is not desirable, particularly for critical pipelines. To ensure proper initialization and prevent these issues, we can rely on the Snapshot.
-
Continuing with the example, we will now move forward to create a new version of the previous Jet Job. To maintain simplicity in the example, the new version so similar to the old one. It takes entries from the 'transaction' map and logs them.
package org.examples.jet.snapshot; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.jet.pipeline.JournalInitialPosition; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.Sources; public class JobV2 { public static void main(String[] args) { var p = Pipeline.create(); var transactionSource = Sources.mapJournal("transaction", JournalInitialPosition.START_FROM_OLDEST); var loggerSink = Sinks.logger(); p.readFrom(transactionSource) .withIngestionTimestamps() .setName("Emit Transactions") .map(e -> { System.out.printf("The transaction '%s' is being executed in 'job-v2'\n", e.getKey()); // execute the transaction return String.format("[Job V2] transaction:'%s' payload:'%s'", e.getKey(), e.getValue()); }) .setName("Apply Transactions") .writeTo(loggerSink) .setName("Log Transactions"); HazelcastInstance hz = Hazelcast.bootstrappedInstance(); hz.getJet().newJob(p); } }
-
Differing from the previous JetJob definition, we will set the 'initialSnapshotResourceName' field to refer to the Snapshot exported in the preceding step. Execute the following command to submit the new Jet Job.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: transaction-v2 spec: hazelcastResourceName: hazelcast state: Running jarName: jet-pipelines-1.0-SNAPSHOT.jar mainClass: org.examples.jet.snapshot.JobV2 initialSnapshotResourceName: snapshot-transaction bucketConfig: bucketURI: '<BUCKET-URI>' secretName: '<SECRET-NAME>' EOF
This Jet Job, named 'transaction-v2', will seamlessly resume processing entries from the state at which we exported the snapshot. In this way, we achieve to process each transaction entries only once within the pipeline.
-
Now check the new Jet job status
$ kubectl get jetjob transaction-v2 NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME transaction-v2 Running 741650518446702593 2023-08-09T13:34:22Z
When you review the logs after submitting the pipeline, you will see the logs of only the transaction entries which are put after the time when we exported the Snapshot. Which means the new version of the transaction pipeline, named 'transaction-v2', won’t executes the transactions which are already executed by the first version of the pipeline named 'transaction-v1'.
$ kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep transaction-v2/Log {"time":"2023-08-09T12:45:11,364", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#1] [Job V2] transaction:'transaction-4' payload:'{\"description\": \"Movie Tickets\", \"amount\": 25.00, \"transactionDate\": \"2023-08-12T18:45:00Z\"}' "} {"time":"2023-08-09T12:47:53,791", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[10.36.0.10]:5702 [dev] [5.4.0] [transaction-v2/Log Transactions#0] [Job V2] transaction:'transaction-5' payload:'{\"description\": \"Gasoline Refill\", \"amount\": 50.30, \"transactionDate\": \"2023-08-13T09:00:00Z\"}' "}
If the new version of the Jet Job, named 'transaction-v2', is not initialized from the exported Snapshot 'transaction', it will begin data processing from the start of the map. This is undesirable, as we have already processed certain transaction entries in the previous version of the pipeline.
If want to make your Snapshots to be persistent against outages or restarts, it would be enough to create a Hazelcast cluster with persistence enabled.
Step 5. Cleaning Up
To clean up all the resources you created during the tutorial, and to remove the custom resources and secrets, run the following command:
kubectl delete $(kubectl get hazelcast -o name)
kubectl delete secret hazelcast-license-key
Deleting the Hazelcast CR will also delete the Jet Jobs and Jet Job Snapshots linked to it. |