Run a Data Pipeline Using Jet with Hazelcast Platform Operator

Learn how to run a data pipeline in Hazelcast using Jet.

Context

In this tutorial, you’ll do the following:

  • Build and deploy a custom Jet pipeline implementation to cloud storage.

  • Deploy Hazelcast with the Jet engine configured.

  • Start the Jet pipeline.

  • Check the Jet pipeline.

  • Cancel the Jet pipeline.

Before You Begin

You need the following:

Step 1. Deploy the Jet Pipeline JAR to Cloud Storage

In this step, you’ll build the Jet pipeline JAR from the tutorial’s GitHub repository, and upload it to your cloud provider.

  1. Clone the sample project.

    • HTTPS

    • SSH

    git clone https://github.com/hazelcast-guides/hazelcast-platform-operator-jet.git
    
    cd hazelcast-platform-operator-jet
    git clone git@github.com:hazelcast-guides/hazelcast-platform-operator-jet.git
    
    cd hazelcast-platform-operator-jet

    The sample code for this tutorial is in the jet-pipeline/src/main/java/org/example/run/ directory:

  2. Build the Jet pipeline JAR.

    mvn package -f ./jet-pipeline
  3. Upload the Jet pipeline JAR to the storage blob/bucket of your cloud provider, replacing the placeholder values.

    • S3

    • GCS

    • ABS

    aws s3 cp jet-pipeline/target/jet-pipeline-run-1.0.0.jar s3://<BUCKET_NAME>
    gsutil cp jet-pipeline/target/jet-pipeline-run-1.0.0.jar gs://<BUCKET_NAME>
    az storage blob upload --account-name <ACCOUNT_NAME> --container-name <CONTAINER_NAME> --file jet-pipeline/target/jet-pipeline-run-1.0.0.jar
  4. Create a secret for your cloud storage by running one of the following commands. Remember to replace the placeholder values.

    • AWS

    • GCP

    • Azure

    kubectl create secret generic <SECRET-NAME> --from-literal=region=<region> \
    	--from-literal=access-key-id=<access-key-id> \
    	--from-literal=secret-access-key=<secret-access-key>
    kubectl create secret generic <SECRET-NAME> --from-file=google-credentials-path=<service_account_json_file>
    kubectl create secret generic <SECRET-NAME> \
    	--from-literal=storage-account=<storage-account> \
    	--from-literal=storage-key=<storage-key>

Step 2. Start the Hazelcast Cluster

  1. Run this command to create a Hazelcast cluster with the Jet engine configured.

    cat <<EOF | kubectl apply -f -
    apiVersion: hazelcast.com/v1alpha1
    kind: Hazelcast
    metadata:
      name: my-hazelcast
    spec:
      clusterSize: 3
      jet:
        resourceUploadEnabled: true
    EOF
    If you want to specify the Hazelcast version explicitly, you need to use Hazelcast 5.3 or above to submit JetJobs with the Hazelcast Platform Operator.
  2. Now check the cluster status to make sure that it is up and running.

    $ kubectl get hazelcast my-hazelcast
    NAME           STATUS    MEMBERS
    my-hazelcast   Running   3/3

Step 3. Run the Data Pipeline

In this step, you’ll start, check and cancel the Jet job that runs the data pipeline.

  1. Run the following command to start the Jet job you have already deployed.

    Replace the <BUCKET-URI> with the appropriate sample bucket URI for your cloud provider:

    • s3://hazelcast-jet-pipeline

    • gs://hazelcast-jet-pipeline

    • azblob://hazelcast-jet-pipeline

    cat <<EOF | kubectl apply -f -
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: my-jet-job
    spec:
      hazelcastResourceName: my-hazelcast
      state: Running
      jarName: jet-pipeline-run-1.0.0.jar
      bucketConfig:
        bucketURI: "<BUCKET-URI>"
        secretName: <SECRET-NAME>
    EOF
  2. Check status of the Jet job.

    kubectl get jetjob my-jet-job -w

    The output will look something like this. Wait until the Running status is displayed.

    NAME         STATUS    ID                   SUBMISSIONTIME         COMPLETIONTIME
    my-jet-job   Starting  0
    my-jet-job   Running   732187341048774657   2023-07-14T10:51:06Z
  3. To see the output of the Jet pipeline in the logs, run the following command.

    kubectl logs -l app.kubernetes.io/name=hazelcast -c hazelcast | grep sequence

    The logs of the Jet pipeline are displayed in the output.

    {"time":"2023-07-14T10:51:06,679", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:06.000, sequence=0) "}
    {"time":"2023-07-14T10:51:07,675", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:07.000, sequence=1) "}
    {"time":"2023-07-14T10:51:08,681", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:08.000, sequence=2) "}
    {"time":"2023-07-14T10:51:09,679", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:09.000, sequence=3) "}
    {"time":"2023-07-14T10:51:10,682", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:10.000, sequence=4) "}
    {"time":"2023-07-14T10:51:11,678", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:11.000, sequence=5) "}
    {"time":"2023-07-14T10:51:12,677", "logger": "com.hazelcast.jet.impl.connector.WriteLoggerP", "level": "INFO", "msg": "[172.17.0.6]:5702 [dev] [5.3.1] [my-jet-job/loggerSink#0] SimpleEvent(timestamp=10:51:12.000, sequence=6) "}
    ....
    ....
  4. Now, cancel the Jet job by setting the spec.state to Canceled. Use one of the following options.

    a) Run the following command to cancel the JetJob, using kubectl apply command.

    cat <<EOF | kubectl apply -f -
    apiVersion: hazelcast.com/v1alpha1
    kind: JetJob
    metadata:
      name: my-jet-job
    spec:
      hazelcastResourceName: my-hazelcast
      state: Canceled
      jarName: jet-pipeline-run-1.0.0.jar
      bucketConfig:
        bucketURI: "<BUCKET-URI>"
        secretName: <SECRET-NAME>
    EOF

    b) Run the following command to cancel the JetJob, using kubectl patch command.

    kubectl patch jetjob my-jet-job -p '{"spec":{"state":"Canceled"}}' --type=merge
  5. Now check the Jet job status to make sure the job was cancelled.

    kubectl get jetjob my-jet-job

    The output shows the Jet job status as ExecutionFailed.

    NAME         STATUS            ID                   SUBMISSIONTIME         COMPLETIONTIME
    my-jet-job   ExecutionFailed   732229926463209474   2023-07-14T13:40:19Z   2023-07-14T14:07:51Z
  6. Run the following command to see a more detailed message for the status of the Jet job.

    kubectl get jetjob my-jet-job -o custom-columns=:.status.failureText

    The failure text is displayed in the output.

    com.hazelcast.jet.impl.exception.CancellationByUserException
          at com.hazelcast.jet.impl.MasterJobContext.createCancellationException(MasterJobContext.java:211)
          at com.hazelcast.jet.impl.MasterJobContext.getErrorFromResponses(MasterJobContext.java:653)
          at com.hazelcast.jet.impl.MasterJobContext.lambda$invokeStartExecution$12(MasterJobContext.java:576)
          at com.hazelcast.jet.impl.MasterContext.lambda$invokeOnParticipant$3(MasterContext.java:376)
          ....
          ....

Step 4. Clean Up

To clean up all the resources you created during the tutorial, and to remove the custom resources and secrets, run the following command:

kubectl delete secret <SECRET-NAME>
kubectl delete $(kubectl get hazelcast -o name)
Deleting the Hazelcast CR will also delete the Jet jobs linked to it.