Create a Custom Sink

In the Custom Sources and Sinks section of our Sources and Sinks programming guide we have seen some basic examples of user-defined sources and sinks. Let us now examine more examples which cover some of the trickier aspects of writing our own sinks.

Step 1. Define a Sink

Let’s write a sink that functions like a file logger. You set it up with a filename and it will write one line for each input it gets into that file. The lines will be composed of a timestamp and then the toString() form of whatever input object produced the line. Here’s a sample:

1583309377078,SimpleEvent(timestamp=10:09:37.000, sequence=2900)
1583309377177,SimpleEvent(timestamp=10:09:37.100, sequence=2901)
1583309377277,SimpleEvent(timestamp=10:09:37.200, sequence=2902)
1583309377376,SimpleEvent(timestamp=10:09:37.300, sequence=2903)

The definition of such a sink could be like this:

class Sinks {
  static Sink<Object> buildLogSink() {
    return SinkBuilder.sinkBuilder(
    "log-sink", pctx -> new PrintWriter("data." + pctx.globalProcessorIndex() + ".csv"))
    .receiveFn((writer, item) -> {
        writer.println(String.format("%d,%s", System.currentTimeMillis(), item.toString()));
        writer.flush();
    })
    .destroyFn(writer -> writer.close())
    .build();
  }
}

Using it in a pipeline happens just as with built-in sinks:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(10))
 .withoutTimestamps()
 .writeTo(Sinks.buildLogSink());

Step 2. Add Batching

Our sink uses a PrintWriter which has internal buffering we could use to make it more efficient. Hazelcast allows us to make buffering a first-class concern and deal with it explicitly by taking an optional flushFn which it will call at regular intervals.

To apply this to our example we need to update our sink definition like this:

SinkBuilder.sinkBuilder(
  "log-sink", pctx -> new PrintWriter("data." + pctx.globalProcessorIndex() + ".csv"))
  .receiveFn((writer, item) -> {
      writer.println(String.format("%d,%s", System.currentTimeMillis(), item.toString()));
  })
  .flushFn(writer -> writer.flush())
  .destroyFn(writer -> writer.close())
  .build();

Step 3. Increase Parallelism

Hazelcast builds the sink to be distributed by default: each member of the Hazelcast cluster has a processor running it. You can configure how many parallel processors there are on each member (the local parallelism) by calling SinkBuilder.preferredLocalParallelism(). By default there will be one processor per member.

The overall job output consists of the contents of all the files written by all processor instances put together.

Let’s increase the local parallelism from the default value of 1 to 2:

SinkBuilder.sinkBuilder(
    "log-sink", pctx -> new PrintWriter("data." + pctx.globalProcessorIndex() + ".csv"))
    .receiveFn((writer, item) -> {
        writer.println(String.format("%d,%s", System.currentTimeMillis(), item.toString()));
    })
    .flushFn(writer -> writer.flush())
    .destroyFn(writer -> writer.close())
    .preferredLocalParallelism(2)
    .build();

The behavioral change we can notice now is that there will be two output files, data.0.csv and data.1.csv, each containing half of the output data.

We could add a second member to the Hazelcast cluster now. At that point we would have two members, both with local parallelism of 2. There would be 4 output files. You would notice however that all the data is in the files written by the processors of a single Hazelcast member.

The other members don’t get any data, because on one hand our pipeline doesn’t contain any operation that would generate distributed edges (ones that carry data from one member to another) and on the other hand the test source we have used only creates one instance globally, regardless of the number of members we have in the cluster. The member containing the test source instance will process all the data in this case. Real sources don’t usually have this limitation.

Step 4. Make the Sink Fault Tolerant

Sinks built via SinkBuilder don’t participate in the fault tolerance protocol. You can’t preserve any internal state if a job fails and gets restarted. In a job with snapshotting enabled your sink will still receive every item at least once. If you ensure that after the flushFn is called all the previous items are persistently stored, your sink provides an at-least-once guarantee. If you don’t (like our first example without the flushFn), your sink can also miss items. If the system you’re storing the data into is idempotent (i.e. writing the same thing multiple times has the exact same effect as writing it a single time - obviously not the case with our example), then your sink will have an exactly-once guarantee.