Run a data pipeline using Jet with Platform Operator
Learn how to run a data pipeline in Hazelcast using Jet.
Overview
In this tutorial, you’ll do the following:
-
Build and deploy a custom Jet pipeline implementation to cloud storage
-
Deploy Hazelcast with the Jet engine configured
-
Start the Jet pipeline
-
Check the Jet pipeline
-
Cancel the Jet pipeline
The tutorial should take approximately 20 minutes to complete.
Prerequisites
Before you begin, make sure that you have:
-
A running Kubernetes cluster
-
Kubernetes command-line tool, kubectl
-
Maven command-line tool, mvn
-
The Hazelcast Code Samples repository https://github.com/hazelcast/hazelcast-code-samples cloned to your local machine
-
Blob storage and access credentials on one of the following cloud providers: AWS, GCP, Azure
Deploy the Jet pipeline JAR to cloud storage
In this step, you’ll build the Jet pipeline JAR from the tutorial’s GitHub repository, and upload it to your cloud provider.
-
Clone the sample project.
git clone https://github.com/hazelcast-guides/hazelcast-platform-operator-jet.git cd hazelcast-platform-operator-jet
git clone [email protected]:hazelcast-guides/hazelcast-platform-operator-jet.git cd hazelcast-platform-operator-jet
The sample code for this tutorial is in the
jet-pipeline/src/main/java/org/example/run/
directory: -
Build the Jet pipeline JAR.
mvn package -f ./jet-pipeline
-
Upload the Jet pipeline JAR to the storage blob/bucket of your cloud provider, replacing the placeholder values.
aws s3 cp jet-pipeline/target/jet-pipeline-run-1.0.0.jar s3://<BUCKET_NAME>
gsutil cp jet-pipeline/target/jet-pipeline-run-1.0.0.jar gs://<BUCKET_NAME>
az storage blob upload --account-name <ACCOUNT_NAME> --container-name <CONTAINER_NAME> --file jet-pipeline/target/jet-pipeline-run-1.0.0.jar
-
Create a secret for your cloud storage by running one of the following commands. Remember to replace the placeholder values.
kubectl create secret generic <SECRET-NAME> --from-literal=region=<region> \ --from-literal=access-key-id=<access-key-id> \ --from-literal=secret-access-key=<secret-access-key>
kubectl create secret generic <SECRET-NAME> --from-file=google-credentials-path=<service_account_json_file>
kubectl create secret generic <SECRET-NAME> \ --from-literal=storage-account=<storage-account> \ --from-literal=storage-key=<storage-key>
If you want to learn more about other authentication methods, check the Authorization Methods to Access Cloud Storage documentation. |
Start the Hazelcast cluster
-
Create a secret with your Hazelcast Enterprise License.
kubectl create secret generic hazelcast-license-key --from-literal=license-key=<hz-license-key>
Starting with version 5.13, Hazelcast Platform Operator only supports Hazelcast Enterprise cluster creation. Even if the feature is used in Hazelcast Open Source clusters, Hazelcast Platform Operator requires a license key to run a cluster. -
Create a Hazelcast cluster with the Jet engine configured.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: Hazelcast metadata: name: my-hazelcast spec: licenseKeySecretName: hazelcast-license-key clusterSize: 3 jet: resourceUploadEnabled: true EOF
If you want to specify the Hazelcast version explicitly, you need to use Hazelcast 5.3 or above to submit Jet jobs with the Hazelcast Platform Operator. -
Now check the cluster status to make sure that it’s up and running.
$ kubectl get hazelcast my-hazelcast NAME STATUS MEMBERS my-hazelcast Running 3/3
Run the data pipeline
In this step, you’ll start, check and cancel the Jet job that runs the data pipeline.
-
Start the Jet job you have already deployed:
Replace the <BUCKET-URI> with the appropriate sample bucket URI for your cloud provider:
-
s3://hazelcast-jet-pipeline
-
gs://hazelcast-jet-pipeline
-
azblob://hazelcast-jet-pipeline
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: my-jet-job spec: hazelcastResourceName: my-hazelcast state: Running jarName: jet-pipeline-run-1.0.0.jar bucketConfig: bucketURI: "<BUCKET-URI>" secretName: <SECRET-NAME> EOF
-
-
Check status of the Jet job.
kubectl get jetjob my-jet-job -w
The output will look something like this. Wait until the
Running
status appears.NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME my-jet-job Starting 0 my-jet-job Running 732187341048774657 2023-07-14T10:51:06Z
-
To see the output of the Jet pipeline in the logs, run the following command:
kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep sequence
The logs of the Jet pipeline are displayed in the output.
{"time":"2023-07-14T10:51:06,679", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:06.000, sequence=0) "} {"time":"2023-07-14T10:51:07,675", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:07.000, sequence=1) "} {"time":"2023-07-14T10:51:08,681", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:08.000, sequence=2) "} {"time":"2023-07-14T10:51:09,679", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:09.000, sequence=3) "} {"time":"2023-07-14T10:51:10,682", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:10.000, sequence=4) "} {"time":"2023-07-14T10:51:11,678", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:11.000, sequence=5) "} {"time":"2023-07-14T10:51:12,677", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:12.000, sequence=6) "} .... ....
-
Now, cancel the Jet job by setting the
spec.state
toCanceled
. Use one of the following options.a) Cancel the Jet job, using
kubectl apply
:kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: my-jet-job spec: hazelcastResourceName: my-hazelcast state: Canceled jarName: jet-pipeline-run-1.0.0.jar bucketConfig: bucketURI: "<BUCKET-URI>" secretName: <SECRET-NAME> EOF
b) Cancel the Jet job, using
kubectl patch
:kubectl patch jetjob my-jet-job -p '{"spec":{"state":"Canceled"}}' --type=merge
-
Now check the Jet job status to make sure the job was cancelled.
kubectl get jetjob my-jet-job
The output shows the Jet job status as
ExecutionFailed
.NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME my-jet-job ExecutionFailed 732229926463209474 2023-07-14T13:40:19Z 2023-07-14T14:07:51Z
-
See a more detailed message for the status of the Jet job:
kubectl get jetjob my-jet-job -o custom-columns=:.status.failureText
The failure text appears in the output.
com.hazelcast.jet.impl.exception.CancellationByUserException at com.hazelcast.jet.impl.MasterJobContext.createCancellationException(MasterJobContext.java:211) at com.hazelcast.jet.impl.MasterJobContext.getErrorFromResponses(MasterJobContext.java:653) at com.hazelcast.jet.impl.MasterJobContext.lambda$invokeStartExecution$12(MasterJobContext.java:576) at com.hazelcast.jet.impl.MasterContext.lambda$invokeOnParticipant$3(MasterContext.java:376) .... ....
Clean up
To clean up all the resources you created during the tutorial, and to remove the custom resources and secrets, run the following command:
kubectl delete secret <SECRET-NAME>
kubectl delete $(kubectl get hazelcast -o name)
Deleting the Hazelcast CR also deletes the Jet jobs linked to it. |
Summary
In this tutorial, you’ve learned how to:
-
Build and deploy a custom Jet pipeline implementation to cloud storage
-
Deploy Hazelcast with the Jet engine configured
-
Start the Jet pipeline
-
Check the Jet pipeline
-
Cancel the Jet pipeline