Observable Connector

The observable connector provides a way of receiving query results on the same client that submitted a job. This connector is useful when you are using the Jet API for ad-hoc queries and you want to use your client as a sink.

Installing the Connector

This connector is included in the full and slim distributions of Hazelcast.

Observable as a Sink

For example, imagine the following pipeline:

HazelcastInstance hz = Hazelcast.bootstrappedInstance();
JetService jet = hz.getJet();
Observable<SimpleEvent> observable = jet.newObservable();
observable.addObserver(e -> System.out.println("Printed from client: " + e));

Pipeline pipeline = p.create();
p.readFrom(TestSources.itemStream(5))
 .withIngestionTimestamps()
 .writeTo(Sinks.observable(observable));
try {
  jet.newJob(pipeline).join();
} finally {
  observable.destroy();
}

When you run this pipeline, you’ll see the following output:

Printed from client: SimpleEvent(timestamp=12:36:53.400, sequence=28)
Printed from client: SimpleEvent(timestamp=12:36:53.600, sequence=29)
Printed from client: SimpleEvent(timestamp=12:36:53.800, sequence=30)
Printed from client: SimpleEvent(timestamp=12:36:54.000, sequence=31)
Printed from client: SimpleEvent(timestamp=12:36:54.200, sequence=32)
Printed from client: SimpleEvent(timestamp=12:36:54.400, sequence=33)
Printed from client: SimpleEvent(timestamp=12:36:54.600, sequence=34)
Printed from client: SimpleEvent(timestamp=12:36:54.800, sequence=35)
Printed from client: SimpleEvent(timestamp=12:36:55.000, sequence=36)

You can see that the printed output is actually on the client, and not on the server. The Jet engine internally uses Hazelcast’s ringbuffer to create a temporary buffer to write the results into and these are then fetched by the client.

Setting a Maximum Capacity

The ringbuffer may lose events if they are being produced at a higher-rate than the clients can consume them. Hazelcast will log a warning in such cases.

To configure the ringbuffer’s capacity, use the setCapacity() method on the Observable.

Getting Notified of Events

Use the onError() or onComplete() methods to get notified of job completion and errors.

Cleaning Up Memory

When you are finished with an Observable object, use the destroy() method to remove it from memory. Otherwise, the cluster will not be able to recover the memory that the object was stored in.

Listing Observables

To list all observables, use the JetService.getObservables() method.

Using Futures

Observable also support a conversion to a future to collect the results.

For example, to collect the job results into a list, you can use the following pattern:

HazelcastInstance hz = Hazelcast.bootstrappedInstance();
JetService jet = hz.getJet();
Observable<String> observable = jet.newObservable();

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("a", "b", "c", "d"))
 .writeTo(Sinks.observable(observable));

Future<List<String>> future = observable.toFuture(
    s -> s.collect(Collectors.toList())
);
jet.newJob(p);

try {
  List<String> results = future.get();
  for (String result : results) {
    System.out.println(result);
  }
} finally {
  observable.destroy();
}