Apply a Custom Transform with Python
Hazelcast allows you to write a function in Python and use it to transform the data flowing through a data pipeline. You are expected to write a function that takes a list of strings and returns the transformed list of strings. Python transform works only on macOS and Linux systems.
Before You Begin
To complete this tutorial, you need the following:
Prerequisites | Useful resources |
---|---|
Python 3.5-3.7 |
|
A Hazelcast cluster running in client/server mode |
|
A Hazelcast Full Distribution |
Step 1. Write a Python Function
Here is the function we want to apply:
import numpy as np
def transform_list(input_list):
num_list = [float(it) for it in input_list]
sqrt_list = np.sqrt(num_list)
return ["sqrt(%d) = %.2f" % (x, y) for (x, y) in zip(num_list, sqrt_list)]
Save this code to take_sqrt.py
in a directory of your choosing, we’ll
call it <python_src>
. Since our code uses numpy
, we need a
requirements file that names it:
numpy
Save this as requirements.txt
in the <python_src>
directory.
Step 2. Create a New Java Project
We’ll assume you’re using an IDE. Create a blank Java project named
tutorial-python
and copy the Gradle or Maven file into it:
plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories.mavenCentral()
dependencies {
implementation 'com.hazelcast:hazelcast:5.1.7'
implementation 'com.hazelcast.jet:hazelcast-jet-python:5.1.7'
}
jar.manifest.attributes 'Main-Class': 'org.example.JetJob'
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>tutorial-python</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>5.1.7</version>
</dependency>
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet-python</artifactId>
<version>5.1.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.example.JetJob</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Step 3. Apply the Python Function to a Pipeline
This code generates a stream of numbers and lets Python take their
square roots. Make sure to set the right path in the .setBaseDir
line:
package org.example;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.*;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.python.PythonServiceConfig;
import static com.hazelcast.jet.python.PythonTransforms.mapUsingPython;
public class JetJob {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(TestSources.itemStream(10, (ts, seq) -> String.valueOf(seq)))
.withoutTimestamps()
.apply(mapUsingPython(new PythonServiceConfig()
.setBaseDir("<python_src>")
.setHandlerModule("take_sqrt")))
.setLocalParallelism(1)
.writeTo(Sinks.logger());
JobConfig cfg = new JobConfig().setName("python-function");
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
hz.getJet().newJob(pipeline, cfg);
}
}
You may run this code from your IDE and it will work, but it will create
its own Hazelcast member. bin/hz-cli
directory is in the distribution which is downloaded before. To run it on the
Hazelcast member you already started, use the command line like this:
gradle build
bin/hz-cli submit build/libs/tutorial-python-1.0-SNAPSHOT.jar
mvn package
bin/hz-cli submit target/tutorial-python-1.0-SNAPSHOT.jar
Now go to the window where you started Hazelcast. Its log output will contain the output from the pipeline, like this:
15:41:58.411 [ INFO] ... sqrt(0) = 0.00
15:41:58.411 [ INFO] ... sqrt(1) = 1.00
15:41:58.411 [ INFO] ... sqrt(2) = 1.41
15:41:58.411 [ INFO] ... sqrt(3) = 1.73
15:41:58.411 [ INFO] ... sqrt(4) = 2.00
15:41:58.412 [ INFO] ... sqrt(5) = 2.24
15:41:58.412 [ INFO] ... sqrt(6) = 2.45
15:41:58.412 [ INFO] ... sqrt(7) = 2.65
Once you’re done with it, cancel the job:
bin/hz-cli cancel python-function