A newer version of Hazelcast Platform is available.

View latest

Serializing Pipelines

To be able to send object state over a network or store it in a file you have to first serialize it into raw bytes. Similarly, to be able to fetch an object state over a wire or read it from a persistent storage you have to deserialize it from raw bytes first. As Hazelcast is a distributed system by nature, serialization is integral part of it. Understanding, when it is involved, how does it support the pipelines and knowing differences between supported strategies is crucial to efficient usage of Hazelcast.

A typical pipeline involves lambda expressions. Since the whole pipeline definition must be serialized to be sent to the cluster, the lambda expressions must be serializable as well. The Java standard provides an essential building block: if the static type of the lambda is a subtype of Serializable you will automatically get a lambda instance that can serialize itself.

None of the functional interfaces in the JDK extend Serializable so we had to mirror the entire java.util.function package in our own com.hazelcast.function with all the interfaces subtyped and made Serializable. Each subtype has the name of the original with Ex appended. For example, a FunctionEx is just like Function but implements Serializable. We use these types everywhere in the Jet API.

As always with this kind of magic, auto-serializability of lambdas has its flipside: it is easy to overlook what’s going on.

If the lambda references a variable in the outer scope, the variable is captured and must also be serializable. If it references an instance variable of the enclosing class, it implicitly captures this so the entire class will be serialized. For example, this will fail because JetJob1 does not implement Serializable:

class JetJob1 {
    private String instanceVar;

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.readFrom(Sources.list("input"))
          // Refers to `instanceVar`, capturing `this`, but `JetJob1` is not
          // `Serializable` so this call will fail.
          .filter(item -> item.equals(instanceVar));
        return p;
    }
}

Just implementing Serializable for JetJob1 would be a viable workaround here. However, consider something just a bit different:

class JetJob2 implements Serializable {
    private String instanceVar;
    // A non-serializable field.
    private OutputStream fileOut;

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.readFrom(Sources.list("input"))
         // Refers to `instanceVar`, capturing `this`. `JetJob2` is declared
         // as `Serializable`, but has a non-serializable field and this fails.
         .filter(item -> item.equals(instanceVar));
        return p;
    }
}

Even though we never refer to fileOut, we are still capturing the entire JetJob2 instance. We might mark fileOut as transient, but the sane approach is to avoid referring to instance variables of the surrounding class. We can simply achieve this by assigning to a local variable, then referring to that variable inside the lambda:

class JetJob3 implements Serializable {
    private String instanceVar;
    // A non-serializable field.
    private OutputStream fileOut;

    Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        // Declare a local variable that loads the value of the instance field.
        String findMe = instanceVar;
        p.readFrom(Sources.list("input"))
         // By referring to the local variable `findMe` we avoid
         // capturing `this` and the job runs fine.
         .filter(item -> item.equals(findMe));
        return p;
    }
}

Another common pitfall is capturing an instance of DateTimeFormatter or a similar non-serializable class:

DateTimeFormatter formatter = DateTimeFormatter
        .ofPattern("HH:mm:ss.SSS")
        .withZone(ZoneId.systemDefault());
Pipeline p = Pipeline.create();
BatchStage<Long> src = p.readFrom(Sources.list("input"));
// Captures the non-serializable formatter, so this fails.
src.map((Long tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));

Sometimes we can get away by using one of the preconfigured formatters available in the JDK:

// Accesses the static final field `ISO_LOCAL_TIME`. Static fields are
// not subject to lambda capture, they are dereferenced when the code
// runs on the target machine.
src.map((Long tstamp) -> DateTimeFormatter.ISO_LOCAL_TIME
        .format(Instant.ofEpochMilli(tstamp).atZone(ZoneId.systemDefault())));

This refers to a static final field in the JDK, so the instance is available on any JVM. If this is not available, you may create a static final field in your own class, but you can also use mapUsingService(). In this case you provide a serializable factory that Hazelcast will ask to create an object on the target member. The object it returns does not have to be serializable. Here’s an example of that:

Pipeline p = Pipeline.create();
BatchStage<Long> src = p.readFrom(Sources.list("input"));
ServiceFactory<?, DateTimeFormatter> serviceFactory = nonSharedService(
        pctx -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
                              .withZone(ZoneId.systemDefault()));
src.mapUsingService(serviceFactory,
        (formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));

Compact Serialization

Compact serialization provides an efficient, schema-based serialization mechanism for your data objects. While Compact serialization has the highest priority in Hazelcast’s serialization service, there are important caveats to understand when using it with Jet pipelines.

Java Serializable Takes Precedence for Lambdas and Functions

Pipeline definitions, including lambda expressions and function objects, are serialized using standard Java serialization (java.io.Serializable). This is because the pipeline definition itself must be sent to cluster members before the Hazelcast serialization service is involved.

When an object implements Serializable, Java serialization is used directly and does not delegate to Hazelcast’s serialization service. This means:

  • All fields of a Serializable object must also be Serializable.

  • Any Hazelcast serializers registered for field types are not used during Java serialization.

  • This applies for Compact serialization even if the field’s class has a registered CompactSerializer.

Entry Processors and Captured Variables

This behavior is particularly relevant when using Sinks.mapWithEntryProcessor(). This sink accepts toEntryProcessorFn, a function that creates an EntryProcessor. Because toEntryProcessorFn is part of the pipeline definition, it is serialized with Java serialization. Only variables captured by toEntryProcessorFn must be Java-serializable.

Other IMap sinks that accept lambdas follow the same captured variable rule, but Sinks.mapWithEntryProcessor() adds one extra level of indirection because you pass a factory for EntryProcessor instances.

The following examples omit unrelated method parameters for brevity:

// No captured state
Sinks.mapWithEntryProcessor(MergeEntryProcessor::new);

// No captured state: OrderStatus is created inside the lambda
Sinks.mapWithEntryProcessor(() -> new MergeEntryProcessor(new OrderStatus("SHIPPED")));

// Captures mep, so MergeEntryProcessor must be Serializable
MergeEntryProcessor mep = new MergeEntryProcessor();
Sinks.mapWithEntryProcessor(() -> mep);

// Captures status, so OrderStatus must be Serializable
OrderStatus status = new OrderStatus("SHIPPED");
Sinks.mapWithEntryProcessor(() -> new MergeEntryProcessor(status));

This capture behavior is standard Java serialization behavior and is not Jet-specific. The same rule applies when you use an EntryProcessor directly with the IMap API.

After toEntryProcessorFn runs, the created EntryProcessor instance is serialized for IMap execution. At this stage, requirements depend on the EntryProcessor serialization strategy:

  • If MergeEntryProcessor is serialized with Java serialization (for example by relying on Serializable), all its fields must be Java-serializable.

  • If MergeEntryProcessor is serialized with Compact serialization, its fields can use Compact serializers.

Target members must also be able to deserialize MergeEntryProcessor. Make this class available where an IMap deserializes it (for example member classpath or User Code Namespace), not only in Jet job resources.

The same applies to lambdas that capture Compact-serializable variables:

// orderStatus has a CompactSerializer, but capturing it in a lambda
// requires it to also implement Serializable
OrderStatus status = new OrderStatus("SHIPPED");
pipeline.readFrom(source)
    .filter(order -> order.getStatus().equals(status));

Using Compact Serialization with Pipelines

Compact and Java serialization can both be involved in the same pipeline, but at different points.

  • For pipeline definition objects (for example lambdas and captured variables), Java serialization rules apply.

  • For runtime data movement and storage, Compact serialization rules apply.

  • If a class is used in both contexts, it may need both a Java serialization compatible form and a Compact serializer.

Compact Serializers and Jet Jobs

When using Compact serialization with Jet, keep the following limits and recommendations in mind:

  • Unlike StreamSerializer, CompactSerializer is currently not supported for single-job registration with JobConfig.registerSerializer().

  • Explicit CompactSerializer implementations must be registered in member configuration.

  • Avoid Zero Config Compact Serialization for classes attached to jobs, because repeated job classloading can lead to classloader leaks and ClassCastException.

  • Prefer keeping Compact-serializable DTOs and related classes on the member classpath.

Serialization of Data Types

The objects you store in Hazelcast data structures must be serializable.

Another case that requires serializable objects is sending computation results between members, for example, when grouping by key. To catch serialization issues early on, we recommend using a 2-member local cluster for development and testing.

Currently, Hazelcast supports five interfaces to serialize custom types:

Deprecation Notice for Portable Serialization

Portable Serialization has been deprecated. We recommend you use Compact Serialization as Portable Serialization will be removed as of version 7.0.

The following table provides a comparison between them to help you in deciding which interface to use in your applications.

Serialization interface Advantages Drawbacks

Serializable

Easy to start with, does not require implementation or registration

CPU intensive and space inefficient

Externalizable

Does not require registration, faster and more space efficient than Serializable

CPU intensive, space inefficient and requires implementation

Portable

Faster and more space efficient than Serializable. Supports versioning and partial deserialization

Requires implementation and registration

StreamSerializer

Fastest and lightest

Requires implementation and registration

CompactSerializer

Faster and more space efficient than Serializable. Supports versioning and partial deserialization. Does not require implementation or registration

Not as fast or lightweight as StreamSerializer

Below you can find rough performance numbers you can expect when employing each of those strategies. A straightforward benchmark that serializes and then deserializes unique, pooled instances of this simple object:

class Person {
    private String firstName;
    private String lastName;
    private int age;
    private float height;
}

yields following throughputs:

# Processor: Intel(R) Core(TM) i9-12900H CPU @ 2.50GHz
# VM version: JDK 17.0.6, OpenJDK 64-Bit Server VM, 17.0.6+10-LTS

Benchmark                                   Mode  Cnt   Score   Error   Units
SerializationBenchmark.serializable        thrpt   10   2.080 ± 0.056  ops/us
SerializationBenchmark.externalizable      thrpt   10   3.171 ± 0.056  ops/us
SerializationBenchmark.portable            thrpt   10   3.330 ± 0.051  ops/us
SerializationBenchmark.compact_zeroConfig  thrpt   10   5.337 ± 0.108  ops/us
SerializationBenchmark.compact_registered  thrpt   10   8.108 ± 0.160  ops/us
SerializationBenchmark.stream              thrpt   10  18.035 ± 0.322  ops/us
compact_zeroConfig represents serialization using Zero Config Compact Serialization, while compact_registered represents serialization using a serializer defined and registered with Compact.

Here are the sizes of the serialized form for the same data by each serializer:

Strategy                                        Number of Bytes  Overhead %
java.io.Serializable                                        154       327.8
java.io.Externalizable                                       93       158.3
com.hazelcast.nio.serialization.Portable                    114       216.7
com.hazelcast.nio.serialization.compact.CompactSerializer    50        38.9
com.hazelcast.nio.serialization.StreamSerializer             36         0.0

You can see that using plain Serializable can easily become a bottleneck in your application, as even with this simple data type it’s significantly slower than some of the other serialization options, not to mention very wasteful with memory.

Write a Custom Serializer

For the best performance and simplest implementation we recommend using the Hazelcast StreamSerializer mechanism. Here is a sample implementation for a Person class:

class PersonSerializer implements StreamSerializer<Person> {

    private static final int TYPE_ID = 1;

    @Override
    public int getTypeId() {
        return TYPE_ID;
    }

    @Override
    public void write(ObjectDataOutput out, Person person) throws IOException {
        out.writeUTF(person.firstName);
        out.writeUTF(person.lastName);
        out.writeInt(person.age);
        out.writeFloat(person.height);
    }

    @Override
    public Person read(ObjectDataInput in) throws IOException {
        return new Person(in.readUTF(), in.readUTF(), in.readInt(), in.readFloat());
    }
}

The type ID you use must be unique across all the serializers you register for a job, and additionally it must not clash with any global serializers you registered with the Hazelcast cluster.

Register a Serializer for a Single job

You can register a serializer in a job’s configuration object:

new JobConfig()
    .registerSerializer(Person.class, PersonSerializer.class)

Such a serializer is scoped: Its type ID doesn’t clash with the same type ID in another job. However, if you also use the serializer hook to register a global serializer on the Hazelcast cluster, a job-local ID would clash with it. The job-local serializer takes precedence, but it is best to avoid such clashes due to the potential for surprising behavior and hard-to-diagnose bugs.

The Jet engine uses the job-local serializer to serialize the objects as they travel through the pipeline (over distributed DAG edges) and get saved to snapshots.

Job-level serializers can also be used with sources and sinks that use in-memory data structures. You can read from/write to a local Observable, IList, IMap or ICache.

Register a Serializer with the Hazelcast cluster

You can register a serializer with the Hazelcast cluster, before starting it. For that you need a SerializerHook:

class PersonSerializerHook implements SerializerHook<Person> {

    @Override
    public Class<Person> getSerializationType() {
        return Person.class;
    }

    @Override
    public Serializer createSerializer() {
        return new PersonSerializer();
    }

    @Override
    public boolean isOverwritable() {
        return true;
    }
}

Hazelcast uses the Java service discovery mechanism to find your serializer hook. You should create a JAR with the serializer hook and its dependent classes, and the JAR should have a file META-INF/services/com.hazelcast.SerializerHook with the fully-qualified name of the serializer hook class:

com.hazelcast.samples.jet.PersonSerializerHook

Alternatively, you can add the following configuration to hazelcast.yaml:

hazelcast:
  serialization:
    serializers:
      serializer:
        "type-class": "com.hazelcast.samples.jet.Person"
        "class-name": "com.hazelcast.samples.jet.PersonSerializer"

Put the JAR containing the serializer hook and related classes in the $HZ_HOME/lib directory. Make sure that each registered serializer has a unique type ID.

The advantage of a cluster-level serializer is that it is supported in all Hazelcast features.

3rd-Party Serialization Support

Google Protocol Buffers

Since the classes generated by Google Protocol Buffers (Protobuf) already implement java.io.Serializable, Hazelcast automatically supports them without a custom serializer. However, for best performance we encourage registering a Protobuf-specific serializer. There is a Jet extension module that simplifies this for Protobuf version 3.

If you want to use it locally within a job, add the extension as a dependency to your job’s project:

  • Gradle

  • Maven

compile "com.hazelcast.jet:hazelcast-jet-protobuf:5.5.0"
<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-jet-protobuf</artifactId>
    <version>5.5.0</version>
</dependency>

Implement the adapter by extending the provided class (where Person is of any Protobuf GeneratedMessageV3 type):

class PersonSerializer extends ProtobufSerializer<Person> {

    private static final int TYPE_ID = 1;

    PersonSerializer() {
        super(Person.class, TYPE_ID);
    }
}

Then register it with the job:

new JobConfig()
    .registerSerializer(Person.class, PersonSerializer.class)

Also make sure that the Protobuf extension JAR is either on the cluster’s classpath or inlined into your job JAR by creating a fat JAR.

You can also install the serializer in the Hazelcast cluster by implementing and registering a serialization hook, as explained above.