Mapping Data in a Pipeline Using a a gRPC Service
When processing data in a pipeline, you can use an external service to execute a function on input data, and pass the output to the next stage of pipeline. A popular way of exposing external services is to use the gRPC protocol.
gRPC is an open-source universal RPC framework available for many platforms and languages.
The hazelcast-jet-grpc
module makes it easy to perform calls to a gRPC
service. Hazelcast supports two gRPC service methods:
-
unary RPC
-
bidirectional streaming RPC
Dependencies
Add this dependency to your Java project:
compile 'com.hazelcast.jet:hazelcast-jet-grpc:5.2.5'
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet-grpc</artifactId>
<version>5.2.5</version>
</dependency>
The Hazelcast cluster must also have this module on the classpath.
Make sure it is present in the lib
directory, if not add it there and
restart the cluster.
Unary RPC
The classical request-response RPC pattern is what gRPC calls "unary RPC". You send a single request message and get a single response message back, just like a plain function call.
Let’s use this protobuf definition as an example:
service ProductService {
rpc ProductInfo (ProductInfoRequest)
returns (ProductInfoReply) {}
}
message ProductInfoRequest {
int32 id = 1;
}
message ProductInfoReply {
string productName = 1;
}
To call this service, use GrpcServices.unaryService()
:
ServiceFactory<?, ? extends GrpcService<ProductInfoRequest, ProductInfoReply>>
productService = unaryService(
() -> ManagedChannelBuilder.forAddress("localhost", PORT) .usePlaintext(),
channel -> ProductServiceGrpc.newStub(channel)::productInfo
);
The first parameter is a factory function that returns a channel
builder. Modify the builder settings as required, as an example we
could have enabled TLS via
io.grpc.ManagedChannelBuilder.useTransportSecurity()
.
The second parameter is a function which, given a gRPC network channel, creates a client-side stub and returns a reference to its method that invokes the service. The stub code is auto-generated by the protobuf compiler.
Returning a method reference isn’t a requirement, though, you can also modify the stub, the input item or anything else you need. The functional type you must comply with is as follows (wildcards omitted for clarity):
FunctionEx<ManagedChannel, BiConsumerEx<T, StreamObserver<R>>> callStubFn
Here, BiConsumerEx<T, StreamObserver<R>>
corresponds to the signature
of the generated gRPC method: it is a function that takes an input item
and the gRPC result observer, and ensures that the observer eventually
receives the gRPC invocation result.
Now you can use the service factory in any of the`mapUsingService*Async` methods:
StreamStage<Trade> trades = ...
trades.mapUsingServiceAsync(productService,
(service, trade) -> {
ProductInfoRequest request = ProductInfoRequest.newBuilder()
.setId(trade.productId()).build();
return service.call(request).thenApply(productReply ->
tuple2(trade, productReply.getProductName()));
})
Bidirectional Streaming RPC
Bidirectional streaming RPC is an extension of the classic RPC paradigm to streaming. You make a single RPC call, within which you can send any number of messages to the server, and it can send any number of messages to you. Since we are using gRPC to transform pipeline items, this general protocol is constrained: the response stream must match one-for-one with the request stream.
This constraint makes the streaming communication very similar to unary RPC, but it eliminates some of the overheads. Sending messages within a single established call is cheaper than creating a new call from scratch. Our benchmarks show 1.5x to 3x improvement, depending on various factors.
This is an example of a bidirectional streaming RPC definition:
service BrokerService {
rpc BrokerInfo (stream BrokerInfoRequest)
returns (stream BrokerInfoReply) {}
}
message BrokerInfoRequest {
int32 id = 1;
}
message BrokerInfoReply {
string brokerName = 1;
}
Note the stream
keyword appearing both in the request and the response.
We can create the following service factory using
GrpcServices.bidirectionalStreamingService()
method:
ServiceFactory<?, ? extends GrpcService<BrokerInfoRequest, BrokerInfoReply>>
brokerService = bidirectionalStreamingService(
() -> ManagedChannelBuilder.forAddress("localhost", PORT).usePlaintext(),
channel -> BrokerServiceGrpc.newStub(channel)::brokerInfo
);
As with the unary service, the first parameter is a supplier returning a channel builder.
The full type of the second parameter is as follows (with wildcards omitted):
FunctionEx<ManagedChannel, FunctionEx<StreamObserver<R>, StreamObserver<T>>> callStubFn
The service method is now a function with the signature
StreamObserver<R> -> StreamObserver<T>
Hazelcast provides its observer of the output (same as in the unary RPC), and this function returns an object where Hazelcast will push the input items.
Now the service factory can be used in any of the mapUsingService*
methods, preferably the mapUsingServiceAsync
.
StreamStage<Tuple2<Trade, String>> tradeAndProducts = ...
tradeAndProducts.mapUsingServiceAsync(brokerService,
(service, t) -> {
BrokerInfoRequest request = BrokerInfoRequest
.newBuilder().setId(t.f0().brokerId()).build();
return service
.call(request)
.thenApply(brokerReply ->
tuple3(t.f0(), t.f1(), brokerReply.getBrokerName()));
})
Improving Throughput with Batching
If your gRPC service’s throughput capacity is very high, and the gRPC
link is the bottleneck, you can significantly improve the throughput by
applying batching. For example, you can use a protobuf definition like
this one (note the repeated
keyword):
service Greeter {
rpc SayHelloListBidirectional (stream HelloRequestList)
returns (stream HelloReplyList) {}
}
message HelloRequestList {
repeated string name = 1;
}
message HelloReplyList {
repeated string message = 1;
}
Create the service in a way similar to previous example:
ServiceFactory<?, ? extends GrpcService<HelloRequestList, HelloReplyList>> bidiService =
bidirectionalStreamingService(
() -> ManagedChannelBuilder.forAddress(host, port).usePlaintext(),
channel -> GreeterGrpc.newStub(channel)::sayHelloListBidirectional
);
In the pipeline, use the specialized mapUsingServiceAsyncBatched
transform:
StreamStage<String> stage = ...
stage.mapUsingServiceAsyncBatched(bidiService,
1024,
(service, itemList) -> {
CompletableFuture<HelloReplyList> future =
service.call(HelloRequestList.newBuilder().addAllName(itemList).build());
return future.thenApply(HelloReplyList::getMessageList);
})
})
If your batch takes more than ~0.8 seconds (including the network overhead), you should increase the value of the following properties so that the clean shutdown succeeds:
jet.grpc.destroy.timeout.seconds
jet.grpc.shutdown.timeout.seconds
The GrpcProperties JavaDoc provides more details about these properties.
See the grpc example module for a complete code example.