Apply a Custom Transform with Python

Hazelcast allows you to write a function in Python and use it to transform the data flowing through a data pipeline. You are expected to write a function that takes a list of strings and returns the transformed list of strings.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

Python 3.5-3.7

A Hazelcast cluster running in client/server mode

Step 1. Write a Python Function

Here is the function we want to apply:

import numpy as np

def transform_list(input_list):
    num_list = [float(it) for it in input_list]
    sqrt_list = np.sqrt(num_list)
    return ["sqrt(%d) = %.2f" % (x, y) for (x, y) in zip(num_list, sqrt_list)]

Save this code to take_sqrt.py in a directory of your choosing, we’ll call it <python_src>. Since our code uses numpy, we need a requirements file that names it:

numpy

Save this as requirements.txt in the <python_src> directory.

Step 2. Create a New Java Project

We’ll assume you’re using an IDE. Create a blank Java project named tutorial-python and copy the Gradle or Maven file into it:

  • Gradle

  • Maven

plugins {
    id 'java'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories.mavenCentral()

dependencies {
    implementation 'com.hazelcast:hazelcast:5.0'
    implementation 'com.hazelcast.jet:hazelcast-jet-python:5.0'
}

jar.manifest.attributes 'Main-Class': 'org.example.JetJob'
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>tutorial-python</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
  </properties>

  <dependencies>
    <dependency>
      <groupId>com.hazelcast.jet</groupId>
      <artifactId>hazelcast-jet</artifactId>
      <version>5.0</version>
  </dependency>
  <dependency>
      <groupId>com.hazelcast.jet</groupId>
      <artifactId>hazelcast-jet-python</artifactId>
      <version>5.0</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <mainClass>org.example.JetJob</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

Step 3. Apply the Python Function to a Pipeline

This code generates a stream of numbers and lets Python take their square roots. Make sure to set the right path in the .setBaseDir line:

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.*;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.python.PythonServiceConfig;

import static com.hazelcast.jet.python.PythonTransforms.mapUsingPython;

public class JetJob {
  public static void main(String[] args) {
    Pipeline pipeline = Pipeline.create();
    pipeline.readFrom(TestSources.itemStream(10, (ts, seq) -> String.valueOf(seq)))
    .withoutTimestamps()
    .apply(mapUsingPython(new PythonServiceConfig()
            .setBaseDir("<python_src>")
            .setHandlerModule("take_sqrt")))
    .setLocalParallelism(1)
    .writeTo(Sinks.logger());

    JobConfig cfg = new JobConfig().setName("python-function");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(pipeline, cfg);
  }
}

You may run this code from your IDE and it will work, but it will create its own Hazelcast member. To run it on the Hazelcast member you already started, use the command line like this:

  • Gradle

  • Maven

gradle build
bin/hz-cli submit build/libs/tutorial-python-1.0-SNAPSHOT.jar
mvn package
bin/hz-cli submit target/tutorial-python-1.0-SNAPSHOT.jar

Now go to the window where you started Hazelcast. Its log output will contain the output from the pipeline, like this:

15:41:58.411 [ INFO] ... sqrt(0) = 0.00
15:41:58.411 [ INFO] ... sqrt(1) = 1.00
15:41:58.411 [ INFO] ... sqrt(2) = 1.41
15:41:58.411 [ INFO] ... sqrt(3) = 1.73
15:41:58.411 [ INFO] ... sqrt(4) = 2.00
15:41:58.412 [ INFO] ... sqrt(5) = 2.24
15:41:58.412 [ INFO] ... sqrt(6) = 2.45
15:41:58.412 [ INFO] ... sqrt(7) = 2.65

Once you’re done with it, cancel the job:

bin/hz-cli cancel python-function