Apply a Custom Transform with Python
Hazelcast allows you to write a function in Python and use it to transform the data flowing through a data pipeline. You are expected to write a function that takes a list of strings and returns the transformed list of strings.
Before You Begin
To complete this tutorial, you need the following:
Prerequisites | Useful resources |
---|---|
Python 3.5-3.7 |
|
A Hazelcast cluster running in client/server mode |
Step 1. Write a Python Function
Here is the function we want to apply:
import numpy as np
def transform_list(input_list):
num_list = [float(it) for it in input_list]
sqrt_list = np.sqrt(num_list)
return ["sqrt(%d) = %.2f" % (x, y) for (x, y) in zip(num_list, sqrt_list)]
Save this code to take_sqrt.py
in a directory of your choosing, we’ll
call it <python_src>
. Since our code uses numpy
, we need a
requirements file that names it:
numpy
Save this as requirements.txt
in the <python_src>
directory.
Step 2. Create a New Java Project
We’ll assume you’re using an IDE. Create a blank Java project named
tutorial-python
and copy the Gradle or Maven file into it:
plugins {
id 'java'
}
group 'org.example'
version '1.0-SNAPSHOT'
repositories.mavenCentral()
dependencies {
implementation 'com.hazelcast:hazelcast:5.1.5'
implementation 'com.hazelcast.jet:hazelcast-jet-python:5.1.5'
}
jar.manifest.attributes 'Main-Class': 'org.example.JetJob'
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>tutorial-python</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
</properties>
<dependencies>
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>5.1.5</version>
</dependency>
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet-python</artifactId>
<version>5.1.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.example.JetJob</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Step 3. Apply the Python Function to a Pipeline
This code generates a stream of numbers and lets Python take their
square roots. Make sure to set the right path in the .setBaseDir
line:
package org.example;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.*;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.python.PythonServiceConfig;
import static com.hazelcast.jet.python.PythonTransforms.mapUsingPython;
public class JetJob {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(TestSources.itemStream(10, (ts, seq) -> String.valueOf(seq)))
.withoutTimestamps()
.apply(mapUsingPython(new PythonServiceConfig()
.setBaseDir("<python_src>")
.setHandlerModule("take_sqrt")))
.setLocalParallelism(1)
.writeTo(Sinks.logger());
JobConfig cfg = new JobConfig().setName("python-function");
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
hz.getJet().newJob(pipeline, cfg);
}
}
You may run this code from your IDE and it will work, but it will create its own Hazelcast member. To run it on the Hazelcast member you already started, use the command line like this:
gradle build
bin/hz-cli submit build/libs/tutorial-python-1.0-SNAPSHOT.jar
mvn package
bin/hz-cli submit target/tutorial-python-1.0-SNAPSHOT.jar
Now go to the window where you started Hazelcast. Its log output will contain the output from the pipeline, like this:
15:41:58.411 [ INFO] ... sqrt(0) = 0.00
15:41:58.411 [ INFO] ... sqrt(1) = 1.00
15:41:58.411 [ INFO] ... sqrt(2) = 1.41
15:41:58.411 [ INFO] ... sqrt(3) = 1.73
15:41:58.411 [ INFO] ... sqrt(4) = 2.00
15:41:58.412 [ INFO] ... sqrt(5) = 2.24
15:41:58.412 [ INFO] ... sqrt(6) = 2.45
15:41:58.412 [ INFO] ... sqrt(7) = 2.65
Once you’re done with it, cancel the job:
bin/hz-cli cancel python-function