Custom Connectors
If Hazelcast doesn’t natively support the data source/sink you need, you can build a connector for it yourself by using the SourceBuilder and SinkBuilder.
SourceBuilder
To make a custom source connector you need two basic ingredients:
-
a context object that holds all the resources and state you need to keep track of
-
a stateless function, `fillBufferFn`, taking two parameters: the state object and a buffer object provided by the Jet API
Hazelcast repeatedly calls fillBufferFn
whenever it needs more data items.
Optimally, the function will fill the buffer with the items it can
acquire without blocking. A hundred items at a time is enough to
eliminate any per-call overheads within Hazelcast. The function is allowed to
block as well, but taking longer than a second to complete can have
negative effects on the overall performance of the processing pipeline.
In the following examples we build a simple batch source that emits the lines of a file:
BatchSource<String> fileSource = SourceBuilder
.batch("file-source", x -> new BufferedReader(new FileReader("input.txt")))
.<String>fillBufferFn((in, buf) -> {
String line = in.readLine();
if (line != null) {
buf.add(line);
} else {
buf.close();
}
})
.destroyFn(BufferedReader::close)
.build();
For a more involved example (which reads data in batches for efficiency, deals with unbounded data, emits timestamps, is distributed and fault tolerant see the Custom Batch Sources and Custom Stream Sources tutorials).
SinkBuilder
To make your custom sink connector you need two basic ingredients:
-
a context object that holds all the resources and state you need to keep track of
-
a stateless function, `receiveFn`, taking two parameters: the state object and a data item sent to the sink
In the following example we build a simple sink which writes the
toString()
form of objects to a file:
Sink<Object> sink = sinkBuilder(
"file-sink", x -> new PrintWriter(new FileWriter("output.txt")))
.receiveFn((out, item) -> out.println(item.toString()))
.destroyFn(PrintWriter::close)
.build();
For a more involved example, covering issues like batching, distributiveness and fault tolerance, see the Custom Sinks tutorial).