Creating a Custom Streaming Source

If Hazelcast is missing a built-in source, or a suitable Kafka Connect Source connector is unavailable, you build your own streaming source, using the Jet API.

Here we will focus on stream sources, ones reading unbounded input data.

Defining a Source

Write a source which is capable of reading lines of text from a file.

class Sources {
  static StreamSource<String> buildNetworkSource() {
    return SourceBuilder
    .stream("network-source", ctx -> {
      int port = 11000;
      ServerSocket serverSocket = new ServerSocket(port);
      ctx.logger().info(String.format("Waiting for connection on port %d ...", port));
      Socket socket = serverSocket.accept();
      BufferedReader reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream()));
      ctx.logger().info(String.format("Data source connected on port %d.", port));
      return new NetworkContext(reader, serverSocket);
    })
    .<String>fillBufferFn((context, buf) -> {
      BufferedReader reader = context.getReader();
      for (int i = 0; i < 128; i++) {
        if (!reader.ready()) {
          return;
        }
        String line = reader.readLine();
        if (line == null) {
          buf.close();
          return;
        }
        buf.add(line);
      }
    })
    .destroyFn(context -> context.close())
    .build();
  }

  private static class NetworkContext {
    private final BufferedReader reader;
    private final ServerSocket serverSocket;

    NetworkContext(BufferedReader reader, ServerSocket serverSocket) {
      this.reader = reader;
      this.serverSocket = serverSocket;
    }

    BufferedReader getReader() {
      return reader;
    }

    void close() {
      try {
        reader.close();
        serverSocket.close();
      } catch (IOException e) {
        throw new RuntimeException(e.getMessage(), e);
      }
    }
  }
}

Using it in a pipeline happens just as with built-in sources:

Pipeline p = Pipeline.create();
p.readFrom(Sources.buildNetworkSource())
 .withoutTimestamps()
 .peek()
 .writeTo(Sinks.noop());

When testing it, the output should look like this (let’s say that the lines we receive over the network always start with a numeric timestamp, let’s say epoch time, followed by a comma and some additional text):

... Output to ordinal 0: 1583310272413,some_more_text_1
... Output to ordinal 0: 1583310272613,some_more_text_2
... Output to ordinal 0: 1583310272813,some_more_text_3

Adding Timestamps

You might have noticed the withoutTimestamps() line in the previous pipeline definition. It is needed because for stream sources Hazelcast has to know what kind of event timestamps they will provide (if any). Now we are using it without timestamps, but this unfortunately means that we aren’t allowed to use Windowed Aggregation in our pipeline.

There are multiple ways to fix this (we can, for example, add timestamps in the pipeline after the source), but the most convenient one is to provide the timestamps right in the source.

If we know that the lines of text we receive over the network are of the same timestamped format as we’ve used before, we could modify our source like this:

SourceBuilder
  .timestampedStream("network-source", ctx -> {
    int port = 11000;
    ServerSocket serverSocket = new ServerSocket(port);
    ctx.logger().info(String.format("Waiting for connection on port %d ...", port));
    Socket socket = serverSocket.accept();
    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    ctx.logger().info(String.format("Data source connected on port %d.", port));
    return new NetworkContext(reader, serverSocket);
})
.<String>fillBufferFn((context, buf) -> {
    BufferedReader reader = context.getReader();
    for (int i = 0; i < 128; i++) {
      if (!reader.ready()) {
        return;
      }

      String line = reader.readLine();
      if (line == null) {
        buf.close();
        return;
      }

      buf.add(line, Long.parseLong(line.substring(0, line.indexOf(','))));
    }
  })
  .destroyFn(context -> context.close())
  .build();

Using it in a pipeline definition also changes a bit:

Pipeline p = Pipeline.create();
p.readFrom(Sources.buildNetworkSource())
 .withNativeTimestamps(0)
 .peek()
 .writeTo(com.hazelcast.jet.pipeline.Sinks.noop());

Increasing Parallelism

In the examples we showed so far the source was non-distributed: Hazelcast will create just a single processor in the whole cluster to serve all the data. This is an easy and obvious way to create a source connector.

If you want to create a distributed source, the challenge is coordinating all the parallel instances to appear as a single, unified source.

In our somewhat contrived example we could simply make each instance listen on its own separate port. We can achieve this by modifying the createFn and making use of the unique, global processor index available in the Processor.Context object we get handed there:

SourceBuilder
  .stream("network-source", ctx -> {
    int port = 11000 + ctx.globalProcessorIndex();
    ServerSocket serverSocket = new ServerSocket(port);
    ctx.logger().info(String.format("Waiting for connection on port %d ...", port));
    Socket socket = serverSocket.accept();
    BufferedReader reader = new BufferedReader(
      new InputStreamReader(socket.getInputStream()));
    ctx.logger().info(String.format("Data source connected on port %d.", port));
    return new NetworkContext(reader, serverSocket);
})
.<String>fillBufferFn((context, buf) -> {
    BufferedReader reader = context.getReader();
    for (int i = 0; i < 128; i++) {
      if (!reader.ready()) {
        return;
      }
      String line = reader.readLine();
      if (line == null) {
        buf.close();
        return;
      }
      buf.add(line);
    }
  })
  .destroyFn(context -> context.close())
  .distributed(2)
  .build();

Notice that we have added an extra call to specify the local parallelism of the source (the distributed() method). This means that each Hazelcast cluster member will now create two such sources.

Adding Fault Tolerance

If you want your source to behave correctly within a streaming job that has a processing guarantee configured (at-least-once or exactly-once), you must help Hazelcast with saving the operational state of your context object to the snapshot storage.

There are two functions you must supply:

  • createSnapshotFn returns a serializable object that has all the data you’ll need to restore the operational state

  • restoreSnapshotFn applies the previously saved snapshot to the current context object

While a job is running, Hazelcast calls createSnapshotFn at regular intervals to save the current state.

When Hazelcast resumes a job, it will:

  • create your context object the usual way, by calling createFn

  • retrieve the latest snapshot object from its storage

  • pass the context and snapshot objects to restoreSnapshotFn

  • start calling fillBufferFn, which must start by emitting the same item it was about to emit when createSnapshotFn was called.

You’ll find that restoreSnapshotFn, somewhat unexpectedly, accepts not one but a list of snapshot objects. If you’re building a simple, non-distributed source, this list will have just one element. However, the same logic must work for distributed sources as well, and a distributed source runs on many parallel processors at the same time. Each of them will produce its own snapshot object. After a restart the number of parallel processors may be different than before (because you added a Hazelcast cluster member, for example), so there’s no one-to-one mapping between the processors before and after the restart. This is why Hazelcast passes all the snapshot objects to all the processors, and your logic must work out which part of their data to use.

Here’s a brief example with a fault-tolerant streaming source that generates a sequence of integers:

StreamSource<Integer> faultTolerantSource = SourceBuilder
  .stream("fault-tolerant-source", processorContext -> new int[1])
  .<Integer>fillBufferFn((numToEmit, buffer) ->
      buffer.add(numToEmit[0]++))
  .createSnapshotFn(numToEmit -> numToEmit[0])
  .restoreSnapshotFn(
      (numToEmit, saved) -> numToEmit[0] = saved.get(0))
  .build();

The snapshotting function returns the current number to emit, the restoring function sets the number from the snapshot to the current state. This source is non-distributed, so we can safely do saved.get(0).