Run a Data Pipeline Using Jet with Hazelcast Platform Operator
Learn how to run a data pipeline in Hazelcast using Jet.
Context
In this tutorial, you’ll do the following:
-
Build and deploy a custom Jet pipeline implementation to cloud storage.
-
Deploy Hazelcast with the Jet engine configured.
-
Start the Jet pipeline.
-
Check the Jet pipeline.
-
Cancel the Jet pipeline.
Before You Begin
Step 1. Deploy the Jet Pipeline JAR to Cloud Storage
In this step, you’ll build the Jet pipeline JAR from the tutorial’s GitHub repository, and upload it to your cloud provider.
-
Clone the sample project.
git clone https://github.com/hazelcast-guides/hazelcast-platform-operator-jet.git cd hazelcast-platform-operator-jet
git clone git@github.com:hazelcast-guides/hazelcast-platform-operator-jet.git cd hazelcast-platform-operator-jet
The sample code for this tutorial is in the
jet-pipeline/src/main/java/org/example/run/
directory: -
Build the Jet pipeline JAR.
mvn package -f ./jet-pipeline
-
Upload the Jet pipeline JAR to the storage blob/bucket of your cloud provider, replacing the placeholder values.
aws s3 cp jet-pipeline/target/jet-pipeline-run-1.0.0.jar s3://<BUCKET_NAME>
gsutil cp jet-pipeline/target/jet-pipeline-run-1.0.0.jar gs://<BUCKET_NAME>
az storage blob upload --account-name <ACCOUNT_NAME> --container-name <CONTAINER_NAME> --file jet-pipeline/target/jet-pipeline-run-1.0.0.jar
-
Create a secret for your cloud storage by running one of the following commands. Remember to replace the placeholder values.
kubectl create secret generic <SECRET-NAME> --from-literal=region=<region> \ --from-literal=access-key-id=<access-key-id> \ --from-literal=secret-access-key=<secret-access-key>
kubectl create secret generic <SECRET-NAME> --from-file=google-credentials-path=<service_account_json_file>
kubectl create secret generic <SECRET-NAME> \ --from-literal=storage-account=<storage-account> \ --from-literal=storage-key=<storage-key>
If you want to learn more about other authentication methods, check the Authorization Methods to Access Cloud Storage documentation. |
Step 2. Start the Hazelcast Cluster
-
Create a secret with your Hazelcast Enterprise License.
kubectl create secret generic hazelcast-license-key --from-literal=license-key=<hz-license-key>
Starting with version 5.13, Hazelcast Platform Operator only supports Hazelcast Enterprise cluster creation. Even the feature can be used in Hazelcast Open Source clusters, Hazelcast Platform Operator requires license key to run a cluster. -
Run this command to create a Hazelcast cluster with the Jet engine configured.
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: Hazelcast metadata: name: my-hazelcast spec: licenseKeySecretName: hazelcast-license-key clusterSize: 3 jet: resourceUploadEnabled: true EOF
If you want to specify the Hazelcast version explicitly, you need to use Hazelcast 5.3 or above to submit JetJobs with the Hazelcast Platform Operator. -
Now check the cluster status to make sure that it is up and running.
$ kubectl get hazelcast my-hazelcast NAME STATUS MEMBERS my-hazelcast Running 3/3
Step 3. Run the Data Pipeline
In this step, you’ll start, check and cancel the Jet job that runs the data pipeline.
-
Run the following command to start the Jet job you have already deployed.
Replace the <BUCKET-URI> with the appropriate sample bucket URI for your cloud provider:
-
s3://hazelcast-jet-pipeline
-
gs://hazelcast-jet-pipeline
-
azblob://hazelcast-jet-pipeline
kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: my-jet-job spec: hazelcastResourceName: my-hazelcast state: Running jarName: jet-pipeline-run-1.0.0.jar bucketConfig: bucketURI: "<BUCKET-URI>" secretName: <SECRET-NAME> EOF
-
-
Check status of the Jet job.
kubectl get jetjob my-jet-job -w
The output will look something like this. Wait until the
Running
status is displayed.NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME my-jet-job Starting 0 my-jet-job Running 732187341048774657 2023-07-14T10:51:06Z
-
To see the output of the Jet pipeline in the logs, run the following command.
kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep sequence
The logs of the Jet pipeline are displayed in the output.
{"time":"2023-07-14T10:51:06,679", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:06.000, sequence=0) "} {"time":"2023-07-14T10:51:07,675", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:07.000, sequence=1) "} {"time":"2023-07-14T10:51:08,681", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:08.000, sequence=2) "} {"time":"2023-07-14T10:51:09,679", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:09.000, sequence=3) "} {"time":"2023-07-14T10:51:10,682", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:10.000, sequence=4) "} {"time":"2023-07-14T10:51:11,678", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:11.000, sequence=5) "} {"time":"2023-07-14T10:51:12,677", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:12.000, sequence=6) "} .... ....
-
Now, cancel the Jet job by setting the
spec.state
toCanceled
. Use one of the following options.a) Run the following command to cancel the JetJob, using
kubectl apply
command.kubectl apply -f - <<EOF apiVersion: hazelcast.com/v1alpha1 kind: JetJob metadata: name: my-jet-job spec: hazelcastResourceName: my-hazelcast state: Canceled jarName: jet-pipeline-run-1.0.0.jar bucketConfig: bucketURI: "<BUCKET-URI>" secretName: <SECRET-NAME> EOF
b) Run the following command to cancel the JetJob, using
kubectl patch
command.kubectl patch jetjob my-jet-job -p '{"spec":{"state":"Canceled"}}' --type=merge
-
Now check the Jet job status to make sure the job was cancelled.
kubectl get jetjob my-jet-job
The output shows the Jet job status as
ExecutionFailed
.NAME STATUS ID SUBMISSIONTIME COMPLETIONTIME my-jet-job ExecutionFailed 732229926463209474 2023-07-14T13:40:19Z 2023-07-14T14:07:51Z
-
Run the following command to see a more detailed message for the status of the Jet job.
kubectl get jetjob my-jet-job -o custom-columns=:.status.failureText
The failure text is displayed in the output.
com.hazelcast.jet.impl.exception.CancellationByUserException at com.hazelcast.jet.impl.MasterJobContext.createCancellationException(MasterJobContext.java:211) at com.hazelcast.jet.impl.MasterJobContext.getErrorFromResponses(MasterJobContext.java:653) at com.hazelcast.jet.impl.MasterJobContext.lambda$invokeStartExecution$12(MasterJobContext.java:576) at com.hazelcast.jet.impl.MasterContext.lambda$invokeOnParticipant$3(MasterContext.java:376) .... ....
Step 4. Clean Up
To clean up all the resources you created during the tutorial, and to remove the custom resources and secrets, run the following command:
kubectl delete secret <SECRET-NAME>
kubectl delete $(kubectl get hazelcast -o name)
Deleting the Hazelcast CR will also delete the Jet jobs linked to it. |