This is a prerelease version.

View latest

Custom Connectors

If Hazelcast doesn’t natively support the data source/sink you need, you can build a connector for it yourself by using the SourceBuilder and SinkBuilder.

SourceBuilder

To make a custom source connector you need two basic ingredients:

  • a context object that holds all the resources and state you need to keep track of

  • a stateless function, `fillBufferFn`, taking two parameters: the state object and a buffer object provided by the Jet API

Hazelcast repeatedly calls fillBufferFn whenever it needs more data items. Optimally, the function will fill the buffer with the items it can acquire without blocking. A hundred items at a time is enough to eliminate any per-call overheads within Hazelcast. The function is allowed to block as well, but taking longer than a second to complete can have negative effects on the overall performance of the processing pipeline.

In the following examples we build a simple batch source that emits the lines of a file:

BatchSource<String> fileSource = SourceBuilder
    .batch("file-source", x -> new BufferedReader(new FileReader("input.txt")))
    .<String>fillBufferFn((in, buf) -> {
        String line = in.readLine();
        if (line != null) {
            buf.add(line);
        } else {
            buf.close();
        }
    })
    .destroyFn(BufferedReader::close)
    .build();

For a more involved example (which reads data in batches for efficiency, deals with unbounded data, emits timestamps, is distributed and fault tolerant see the Custom Batch Sources and Custom Stream Sources tutorials).

SinkBuilder

To make your custom sink connector you need two basic ingredients:

  • a context object that holds all the resources and state you need to keep track of

  • a stateless function, `receiveFn`, taking two parameters: the state object and a data item sent to the sink

In the following example we build a simple sink which writes the toString() form of objects to a file:

Sink<Object> sink = sinkBuilder(
    "file-sink", x -> new PrintWriter(new FileWriter("output.txt")))
    .receiveFn((out, item) -> out.println(item.toString()))
    .destroyFn(PrintWriter::close)
    .build();

For a more involved example, covering issues like batching, distributiveness and fault tolerance, see the Custom Sinks tutorial).