A newer version of Hazelcast Platform is available.

View latest

Apply a Custom Transform with Python

Hazelcast allows you to write a function in Python and use it to transform the data flowing through a data pipeline. You are expected to write a function that takes a list of strings and returns the transformed list of strings. Python transform works only on macOS and Linux systems.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

Python 3.5-3.7

A Hazelcast cluster running in client/server mode

A Hazelcast Full Distribution

Step 1. Write a Python Function

Here is the function we want to apply:

import numpy as np

def transform_list(input_list):
    num_list = [float(it) for it in input_list]
    sqrt_list = np.sqrt(num_list)
    return ["sqrt(%d) = %.2f" % (x, y) for (x, y) in zip(num_list, sqrt_list)]

Save this code to take_sqrt.py in a directory of your choosing, we’ll call it <python_src>. Since our code uses numpy, we need a requirements file that names it:

numpy

Save this as requirements.txt in the <python_src> directory.

Step 2. Create a New Java Project

We’ll assume you’re using an IDE. Create a blank Java project named tutorial-python and copy the Gradle or Maven file into it:

  • Gradle

  • Maven

plugins {
    id 'java'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories.mavenCentral()

dependencies {
    implementation 'com.hazelcast:hazelcast:5.0.5'
    implementation 'com.hazelcast.jet:hazelcast-jet-python:5.0.5'
}

jar.manifest.attributes 'Main-Class': 'org.example.JetJob'
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>tutorial-python</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
  </properties>

  <dependencies>
    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast</artifactId>
      <version>5.0.5</version>
  </dependency>
  <dependency>
      <groupId>com.hazelcast.jet</groupId>
      <artifactId>hazelcast-jet-python</artifactId>
      <version>5.0.5</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <mainClass>org.example.JetJob</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

Step 3. Apply the Python Function to a Pipeline

This code generates a stream of numbers and lets Python take their square roots. Make sure to set the right path in the .setBaseDir line:

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.*;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.python.PythonServiceConfig;

import static com.hazelcast.jet.python.PythonTransforms.mapUsingPython;

public class JetJob {
  public static void main(String[] args) {
    Pipeline pipeline = Pipeline.create();
    pipeline.readFrom(TestSources.itemStream(10, (ts, seq) -> String.valueOf(seq)))
    .withoutTimestamps()
    .apply(mapUsingPython(new PythonServiceConfig()
            .setBaseDir("<python_src>")
            .setHandlerModule("take_sqrt")))
    .setLocalParallelism(1)
    .writeTo(Sinks.logger());

    JobConfig cfg = new JobConfig().setName("python-function");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(pipeline, cfg);
  }
}

You may run this code from your IDE and it will work, but it will create its own Hazelcast member. bin/hz-cli directory is in the distribution which is downloaded before. To run it on the Hazelcast member you already started, use the command line like this:

  • Gradle

  • Maven

gradle build
bin/hz-cli submit build/libs/tutorial-python-1.0-SNAPSHOT.jar
mvn package
bin/hz-cli submit target/tutorial-python-1.0-SNAPSHOT.jar

Now go to the window where you started Hazelcast. Its log output will contain the output from the pipeline, like this:

15:41:58.411 [ INFO] ... sqrt(0) = 0.00
15:41:58.411 [ INFO] ... sqrt(1) = 1.00
15:41:58.411 [ INFO] ... sqrt(2) = 1.41
15:41:58.411 [ INFO] ... sqrt(3) = 1.73
15:41:58.411 [ INFO] ... sqrt(4) = 2.00
15:41:58.412 [ INFO] ... sqrt(5) = 2.24
15:41:58.412 [ INFO] ... sqrt(6) = 2.45
15:41:58.412 [ INFO] ... sqrt(7) = 2.65

Once you’re done with it, cancel the job:

bin/hz-cli cancel python-function