It is possible to implement custom reusable Pipeline API stage extension.
This tutorial is based on PythonExtension available in Hazelcast and walks step-by-step
how it is created.
Before you begin
PythonExtension already exists, but in order to recreate it in steps according to this tutorial, you will need:
-
A Java project with dependency to
hazelcastandhazelcast-jet-pythonmodules -
Python in version compatible with Jet (see mapUsingPython requirements)
-
A Python script with function to invoke
You can reuse setup from Apply a Custom Transform with Python tutorial.
Step 1. Define supported stage types
First step is to decide to what stages the extension will be applicable.
It is done by implementing StageExtension interface from appropriate Stage class.
Most commonly StreamStage and BatchStage will be used and often it is relatively easy
to implement extension for both kinds of stages and it makes it more flexible.
In some cases you may want to also support StreamStageWithKey and BatchStageWithKey.
public interface PythonExtension extends
StreamStage.StageExtension<String, /*...*/>,
BatchStage.StageExtension<String, /*...*/> {
}
The Python extension supports both stream and batch stage.
Python transformation works only with String.
In more general cases this should be a generic type parameter.
Step 2. Define API for the extension
Next you need to define the fluent API that will be available thanks to the extension. It is recommended to use interfaces to clearly separate API from implementation.
Usually the extension will ultimately return to the original StreamStage and BatchStage API,
but possibly with changed item type eg. due to mapping performed via extension.
public interface PythonExtension extends
StreamStage.StageExtension<String, PythonExtension.PythonStage<StreamStage<String>>>,
BatchStage.StageExtension<String, PythonExtension.PythonStage<BatchStage<String>>> { // (1)
interface PythonStage<S extends GeneralStage<String>> { // (2)
S map(PythonServiceConfig cfg); // (3)
}
}
Initially the example Python extension will support only single map method (3).
In order to handle both StreamStage and BatchStage in type-safe way we use generic S parameter (2)
so that map method returns correct stage type (stream or batch) for further chaining.
StageExtension needs to know the type that will be used as stage API for extensions, so it is configured in (1).
Step 3. Define entry point for the extension
Static parameterless method (1) is a recommended way to provide entry point to the extension for stage.using() invocation
as it can be statically imported and produces a readable, fluent syntax.
Note that the method provides implementations for different stage types (2) - Java compiler will choose appropriate variant automatically. If there are generic types (e.g. stream item type), the compiler will infer them properly avoiding the need to specify them explicitly.
public interface PythonExtension extends
StreamStage.StageExtension<String, PythonExtension.PythonStage<StreamStage<String>>>,
BatchStage.StageExtension<String, PythonExtension.PythonStage<BatchStage<String>>> {
static PythonExtension python() { // (1)
return new PythonExtensionImpl(); // (2)
}
}
final class PythonExtensionImpl implements PythonExtension {
}
Step 4. Implement the extension
There are two pieces left to be implemented: the extension class (PythonExtensionImpl) and the custom stage (PythonStage).
final class PythonExtensionImpl implements PythonExtension {
@Override
public PythonStage<StreamStage<String>> extend(StreamStage<String> streamStage) { // (1)
return new GeneralPythonStage<>(streamStage);
}
@Override
public PythonStage<BatchStage<String>> extend(BatchStage<String> batchStage) { // (2)
return new GeneralPythonStage<>(batchStage);
}
}
static class GeneralPythonStage<S extends GeneralStage<String>> implements PythonStage<S> {
private final S stage;
GeneralPythonStage(S stage) {
this.stage = stage; // (3)
}
@Override
public S map(PythonServiceConfig cfg) {
return (S) stage.mapUsingServiceAsyncBatched( // (4)
PythonService.factory(cfg), // (5)
PythonTransforms.DEFAULT_MAX_BATCH_SIZE,
PythonService::sendRequest)
.setName("mapUsingPython"); // (6)
}
}
Extension class is just an implementation of a visitor pattern (1) (2) that creates appropriate custom stage implementation.
It gets a reference to current stage in the pipeline which can be used to implement the extensions logic using existing infrastructure,
for example mapUsingService.
The custom stage remembers the stage (3) so it can be used to implement the map method.
map methods invokes standard mapUsingServiceAsyncBatched method (4) wiring extension-specific logic (service factory, mapping method) (5).
We also provide a default stage name (6).
The created stage is returned, so it is possible to use standard Stage methods like setName or setLocalParallelism to customize the just-created stage.
The result must be cast to S because in this context mapUsingServiceAsyncBatched returns GeneralStage (4).
In case of the Python extension, functions will always return the same stage type as it was before applying the extension as mapUsingServiceAsyncBatched does not change StreamStage to BatchStage; item type will still be String too.
In general however, especially if the item type changes, you may need to handle StreamStage and BatchStage explicitly.
You can see how this can be implemented for example in mapUsingIMap:
-
GeneralStageprovides base method definitions, javadoc and some shared default implementations. Methods returnGeneralStage -
StreamStageandBatchStageoverride the methods to return correct stage type, invoke base method and cast the result
Step 5. Test the extension
Extensions can be tested by using them in a test pipeline.
import static com.hazelcast.jet.python.PythonExtension.python;
public void streamStage_mapUsingPython_extension() {
// Given
PythonServiceConfig cfg = new PythonServiceConfig()
.setBaseDir(baseDir.toString())
.setHandlerModule("echo")
.setHandlerFunction("handle");
List<String> items = IntStream.range(0, ITEM_COUNT).mapToObj(Integer::toString).collect(toList());
Pipeline p = Pipeline.create();
var stage = p.readFrom(TestSources.items(items)).addTimestamps(x -> 0, 0);
// When
var mapped = stage.using(python())
.map(cfg) // (1)
.setName("python-echo") // (2)
.setLocalParallelism(2);
// Then
mapped.writeTo(AssertionSinks.assertAnyOrder(
"Python didn't map the items correctly", items.stream().map(i -> "echo-" + i).collect(toList())
));
instance().getJet().newJob(p).join();
}
The extension invocation (1) is very simple: stage.using(python()).map(cfg).
map returns one of the standard Stage interfaces, so you can customize it in the same way as with standard transforms (2), by giving it a more specific name (setName) or configuring parallelism (setLocalParallelism).
Step 6. Add more capabilities
Once the basic structure works, you can add more methods and capabilities. You can leverage the fact that you control the return type, so any fluent API can be implemented as long as you ultimately return to one of the standard stages.
Python extension for example has a fluent builder API to prepare PythonServiceConfig on the fly:
StreamStage<String> sourceStage;
var mapped = sourceStage.using(python())
.baseDir("/tmp")
.handlerModule("echo")
.handlerFunction("handle")
.maxBatchSize(1)
.map().setName("python-echo").setLocalParallelism(2);
Summary
In this tutorial, you learned how to implement and use custom stage extension in Jet Pipeline API.
Next steps
-
Review the
PythonExtensionproduction-ready API and implementation in the Hazelcast repository -
Check other available extensions
-
Implement your own extension to simplify your pipeline, wrap a complex service or reuse some logic