Adding Batching to a Custom Source
To improve the throughput of your custom sources, you can add batching to reads.
Defining a Source
Here is how you create a source that reads lines of text from a file:
public class Sources {
public static BatchSource<String> buildLineSource() {
return SourceBuilder
.batch("line-source", x ->
new BufferedReader(new FileReader("lines.txt")))
.<String>fillBufferFn((in, buf) -> {
String line = in.readLine();
if (line != null) {
buf.add(line);
} else {
buf.close();
}
})
.destroyFn(buf -> buf.close())
.build();
}
}
Now it is ready to be used from the pipeline, just like a built-in source:
Pipeline p = Pipeline.create();
p.readFrom(Sources.buildLineSource())
.writeTo(Sinks.logger());
Adding Batching
While this simple source functions correctly, it’s not optimal because
it reads just one line and returns. There are some overheads to
repeatedly calling fillBufferFn
and you can add more than one item to
the buffer:
SourceBuilder
.batch("line-source",
x -> new BufferedReader(new FileReader("lines.txt")))
.<String>fillBufferFn((in, buf) -> {
for (int i = 0; i < 128; i++) {
String line = in.readLine();
if (line == null) {
buf.close();
return;
}
buf.add(line);
}
})
.destroyFn(buf -> buf.close())
.build();
This code adds 128 lines at a time. In your specific case you should
use the rule of thumb to target about 1 millisecond spent in a single
invocation of fillBufferFn
.