This is a prerelease version.

View latest

Create a Custom Stage Extension

It is possible to implement custom reusable Pipeline API stage extension. This tutorial is based on PythonExtension available in Hazelcast and walks step-by-step how it is created.

Before you begin

PythonExtension already exists, but in order to recreate it in steps according to this tutorial, you will need:

  1. A Java project with dependency to hazelcast and hazelcast-jet-python modules

  2. Python in version compatible with Jet (see mapUsingPython requirements)

  3. A Python script with function to invoke

You can reuse setup from Apply a Custom Transform with Python tutorial.

Step 1. Define supported stage types

First step is to decide to what stages the extension will be applicable. It is done by implementing StageExtension interface from appropriate Stage class. Most commonly StreamStage and BatchStage will be used and often it is relatively easy to implement extension for both kinds of stages and it makes it more flexible. In some cases you may want to also support StreamStageWithKey and BatchStageWithKey.

public interface PythonExtension extends
        StreamStage.StageExtension<String, /*...*/>,
        BatchStage.StageExtension<String, /*...*/> {
}

The Python extension supports both stream and batch stage. Python transformation works only with String. In more general cases this should be a generic type parameter.

Step 2. Define API for the extension

Next you need to define the fluent API that will be available thanks to the extension. It is recommended to use interfaces to clearly separate API from implementation.

Usually the extension will ultimately return to the original StreamStage and BatchStage API, but possibly with changed item type eg. due to mapping performed via extension.

public interface PythonExtension extends
        StreamStage.StageExtension<String, PythonExtension.PythonStage<StreamStage<String>>>,
        BatchStage.StageExtension<String, PythonExtension.PythonStage<BatchStage<String>>> {     // (1)

    interface PythonStage<S extends GeneralStage<String>> {                                  // (2)
        S map(PythonServiceConfig cfg);                                                      // (3)
    }
}

Initially the example Python extension will support only single map method (3). In order to handle both StreamStage and BatchStage in type-safe way we use generic S parameter (2) so that map method returns correct stage type (stream or batch) for further chaining.

StageExtension needs to know the type that will be used as stage API for extensions, so it is configured in (1).

Step 3. Define entry point for the extension

Static parameterless method (1) is a recommended way to provide entry point to the extension for stage.using() invocation as it can be statically imported and produces a readable, fluent syntax.

Note that the method provides implementations for different stage types (2) - Java compiler will choose appropriate variant automatically. If there are generic types (e.g. stream item type), the compiler will infer them properly avoiding the need to specify them explicitly.

public interface PythonExtension extends
        StreamStage.StageExtension<String, PythonExtension.PythonStage<StreamStage<String>>>,
        BatchStage.StageExtension<String, PythonExtension.PythonStage<BatchStage<String>>> {

    static PythonExtension python() {       // (1)
        return new PythonExtensionImpl();   // (2)
    }
}

final class PythonExtensionImpl implements PythonExtension {
}

Step 4. Implement the extension

There are two pieces left to be implemented: the extension class (PythonExtensionImpl) and the custom stage (PythonStage).

final class PythonExtensionImpl implements PythonExtension {

    @Override
    public PythonStage<StreamStage<String>> extend(StreamStage<String> streamStage) {  // (1)
        return new GeneralPythonStage<>(streamStage);
    }

    @Override
    public PythonStage<BatchStage<String>> extend(BatchStage<String> batchStage) {     // (2)
        return new GeneralPythonStage<>(batchStage);
    }
}

static class GeneralPythonStage<S extends GeneralStage<String>> implements PythonStage<S> {
    private final S stage;

    GeneralPythonStage(S stage) {
        this.stage = stage;                                  // (3)
    }

    @Override
    public S map(PythonServiceConfig cfg) {
        return (S) stage.mapUsingServiceAsyncBatched(       // (4)
                    PythonService.factory(cfg),             // (5)
                    PythonTransforms.DEFAULT_MAX_BATCH_SIZE,
                    PythonService::sendRequest)
                .setName("mapUsingPython");                 // (6)
    }
}

Extension class is just an implementation of a visitor pattern (1) (2) that creates appropriate custom stage implementation. It gets a reference to current stage in the pipeline which can be used to implement the extensions logic using existing infrastructure, for example mapUsingService.

The custom stage remembers the stage (3) so it can be used to implement the map method. map methods invokes standard mapUsingServiceAsyncBatched method (4) wiring extension-specific logic (service factory, mapping method) (5). We also provide a default stage name (6). The created stage is returned, so it is possible to use standard Stage methods like setName or setLocalParallelism to customize the just-created stage.

The result must be cast to S because in this context mapUsingServiceAsyncBatched returns GeneralStage (4). In case of the Python extension, functions will always return the same stage type as it was before applying the extension as mapUsingServiceAsyncBatched does not change StreamStage to BatchStage; item type will still be String too. In general however, especially if the item type changes, you may need to handle StreamStage and BatchStage explicitly. You can see how this can be implemented for example in mapUsingIMap:

  1. GeneralStage provides base method definitions, javadoc and some shared default implementations. Methods return GeneralStage

  2. StreamStage and BatchStage override the methods to return correct stage type, invoke base method and cast the result

Step 5. Test the extension

Extensions can be tested by using them in a test pipeline.

import static com.hazelcast.jet.python.PythonExtension.python;

public void streamStage_mapUsingPython_extension() {
    // Given
    PythonServiceConfig cfg = new PythonServiceConfig()
            .setBaseDir(baseDir.toString())
            .setHandlerModule("echo")
            .setHandlerFunction("handle");
    List<String> items = IntStream.range(0, ITEM_COUNT).mapToObj(Integer::toString).collect(toList());
    Pipeline p = Pipeline.create();
    var stage = p.readFrom(TestSources.items(items)).addTimestamps(x -> 0, 0);

    // When
    var mapped = stage.using(python())
                      .map(cfg)                    // (1)
                      .setName("python-echo")      // (2)
                      .setLocalParallelism(2);

    // Then
    mapped.writeTo(AssertionSinks.assertAnyOrder(
            "Python didn't map the items correctly", items.stream().map(i -> "echo-" + i).collect(toList())
    ));
    instance().getJet().newJob(p).join();
}

The extension invocation (1) is very simple: stage.using(python()).map(cfg).

map returns one of the standard Stage interfaces, so you can customize it in the same way as with standard transforms (2), by giving it a more specific name (setName) or configuring parallelism (setLocalParallelism).

Step 6. Add more capabilities

Once the basic structure works, you can add more methods and capabilities. You can leverage the fact that you control the return type, so any fluent API can be implemented as long as you ultimately return to one of the standard stages.

Python extension for example has a fluent builder API to prepare PythonServiceConfig on the fly:

StreamStage<String> sourceStage;
var mapped = sourceStage.using(python())
        .baseDir("/tmp")
        .handlerModule("echo")
        .handlerFunction("handle")
        .maxBatchSize(1)
        .map().setName("python-echo").setLocalParallelism(2);

Summary

In this tutorial, you learned how to implement and use custom stage extension in Jet Pipeline API.

Next steps

  1. Review the PythonExtension production-ready API and implementation in the Hazelcast repository

  2. Check other available extensions

  3. Implement your own extension to simplify your pipeline, wrap a complex service or reuse some logic