Observable Connector
The observable connector provides a way of receiving query results on the same client that submitted a job. This connector is useful when you are using the Jet API for ad-hoc queries and you want to use your client as a sink.
Installing the Connector
This connector is included in the full and slim distributions of Hazelcast.
Observable as a Sink
For example, imagine the following pipeline:
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
JetService jet = hz.getJet();
Observable<SimpleEvent> observable = jet.newObservable();
observable.addObserver(e -> System.out.println("Printed from client: " + e));
Pipeline pipeline = p.create();
p.readFrom(TestSources.itemStream(5))
.withIngestionTimestamps()
.writeTo(Sinks.observable(observable));
try {
jet.newJob(pipeline).join();
} finally {
observable.destroy();
}
When you run this pipeline, you’ll see the following output:
Printed from client: SimpleEvent(timestamp=12:36:53.400, sequence=28)
Printed from client: SimpleEvent(timestamp=12:36:53.600, sequence=29)
Printed from client: SimpleEvent(timestamp=12:36:53.800, sequence=30)
Printed from client: SimpleEvent(timestamp=12:36:54.000, sequence=31)
Printed from client: SimpleEvent(timestamp=12:36:54.200, sequence=32)
Printed from client: SimpleEvent(timestamp=12:36:54.400, sequence=33)
Printed from client: SimpleEvent(timestamp=12:36:54.600, sequence=34)
Printed from client: SimpleEvent(timestamp=12:36:54.800, sequence=35)
Printed from client: SimpleEvent(timestamp=12:36:55.000, sequence=36)
You can see that the printed output is actually on the client, and not on the server. The Jet engine internally uses Hazelcast’s ringbuffer to create a temporary buffer to write the results into and these are then fetched by the client.
Setting a Maximum Capacity
The ringbuffer may lose events if they are being produced at a higher-rate than the clients can consume them. Hazelcast will log a warning in such cases.
To configure the ringbuffer’s capacity, use the setCapacity()
method on the Observable
.
Getting Notified of Events
Use the onError()
or onComplete()
methods to
get notified of job completion and errors.
Cleaning Up Memory
When you are finished with an Observable
object, use the destroy()
method to remove it from memory. Otherwise, the cluster will not be able to recover the memory that the object was stored in.
Using Futures
Observable
also support a conversion to a future to collect the
results.
For example, to collect the job results into a list, you can use the following pattern:
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
JetService jet = hz.getJet();
Observable<String> observable = jet.newObservable();
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("a", "b", "c", "d"))
.writeTo(Sinks.observable(observable));
Future<List<String>> future = observable.toFuture(
s -> s.collect(Collectors.toList())
);
jet.newJob(p);
try {
List<String> results = future.get();
for (String result : results) {
System.out.println(result);
}
} finally {
observable.destroy();
}