JMS Connector
JMS (Java Message Service) is a standard API for communicating with various message brokers using the queue or publish-subscribe patterns.
There are several brokers that implement the JMS standard, including:
-
Apache ActiveMQ and ActiveMQ Artemis
-
Amazon SQS
-
IBM MQ
-
RabbitMQ
-
Solace
Hazelcast is able to utilize these brokers both as a source and a sink through the use of the JMS API.
Installing the Connector
This connector is included in the full and slim distributions of Hazelcast.
To use a JMS broker, such as ActiveMQ, you need to add the client libraries and all their transitive dependencies to the classpath of your members. If the broker provides a fat client JAR, you may find it easier to use that instead.
Connection Handling
The JMS source and sink open one connection to the JMS server for each member and each vertex. Then each parallel worker of the source creates a session and a message consumer/producer using that connection.
IO failures are generally handled by the JMS client and do not cause the connector to fail. Most of the clients offer a configuration parameter to enable auto-reconnection, refer to your client’s documentation for details.
JMS as a Source
A very simple pipeline which consumes messages from a given ActiveMQ queue and then logs them is given below:
Pipeline p = Pipeline.create();
p.readFrom(Sources.jmsQueue("queueName",
() -> new ActiveMQConnectionFactory("tcp://localhost:61616")))
.withoutTimestamps()
.writeTo(Sinks.logger());
For a topic you can choose whether the consumer is durable or shared.
You need to use the consumerFn
to create the desired consumer using a
JMS Session
object.
If you create a shared consumer, you need to let Hazelcast know by calling
sharedConsumer(true)
on the builder. If you don’t do this, only one
cluster member will actually connect to the JMS broker and will receive
all the messages. We always assume a shared consumer for queues.
If you create a non-durable consumer, the fault-tolerance features won’t work since the JMS broker won’t track which messages were delivered to the client and which not.
Below is a simple example to create a non-durable non-shared topic source:
Pipeline p = Pipeline.create();
p.readFrom(Sources.jmsTopic("topic",
() -> new ActiveMQConnectionFactory("tcp://localhost:61616")))
.withoutTimestamps()
.writeTo(Sinks.logger());
Here is a more complex example that uses a shared, durable consumer:
Pipeline p = Pipeline.create();
p.readFrom(Sources
.jmsTopicBuilder(() ->
new ActiveMQConnectionFactory("tcp://localhost:61616"))
.sharedConsumer(true)
.consumerFn(session -> {
Topic topic = session.createTopic("topic");
return session.createSharedDurableConsumer(topic, "consumer-name");
})
.build())
.withoutTimestamps()
.writeTo(Sinks.logger());
Fault Tolerance
The source connector is fault-tolerant with the exactly-once guarantee
(except for the non-durable topic consumer). Fault tolerance is achieved
by acknowledging the consumed messages only after they were fully
processed by the downstream stages. Acknowledging is done once per
snapshot, you need to enable the processing guarantee in the
JobConfig
.
In the exactly-once mode the processor saves the IDs of the messages
processed since the last snapshot into the snapshotted state. Therefore
this mode will not work if your messages don’t have the JMS Message ID
set (it is an optional feature of JMS). In this case you need to set
messageIdFn
on the builder to extract the message ID from the payload.
If you don’t have a message ID to use, you must reduce the source
guarantee to at-least-once:
p.readFrom(Sources.jmsTopicBuilder(...)
.maxGuarantee(ProcessingGuarantee.AT_LEAST_ONCE)
...)
In the at-least-once mode messages are acknowledged in the same way as in the exactly-once mode, but message IDs are not saved to the snapshot.
If you have no processing guarantee enabled, the processor will consume
the messages in the DUPS_OK_ACKNOWLEDGE
mode.
JMS as a Sink
The JMS sink uses the supplied function to create a Message
object for
each input item. The following code snippets show writing to a JMS queue
and a JMS topic using the ActiveMQ JMS client.
Pipeline p = Pipeline.create();
p.readFrom(Sources.list("inputList"))
.writeTo(Sinks.jmsQueue("queue",
() -> new ActiveMQConnectionFactory("tcp://localhost:61616"))
);
Pipeline p = Pipeline.create();
p.readFrom(Sources.list("inputList"))
.writeTo(Sinks.jmsTopic("topic",
() -> new ActiveMQConnectionFactory("tcp://localhost:61616"))
);
Fault Tolerance
The JMS sink supports the exactly-once guarantee. It uses two-phase XA
transactions, messages are committed consistent with the last state
snapshot. This greatly increases the latency, it is determined by the
snapshot interval: messages are visible to consumers only after the
commit. In order to make it work, the connection factory you provide has
to implement javax.jms.XAConnectionFactory
, otherwise the job will not
start.
If you want to avoid the higher latency, decrease the overhead introduced by the XA transactions, if your JMS implementation doesn’t support XA transactions or if you just don’t need the guarantee, you can reduce it just for the sink:
stage.writeTo(Sinks
.jmsQueueBuilder(() -> new ActiveMQConnectionFactory("tcp://localhost:61616"))
// decrease the guarantee for the sink
.exactlyOnce(false)
.build());
In the at-least-once mode or if no guarantee is enabled, the transaction is committed after each batch of messages: transactions are used for performance as this is JMS' way to send messages in batches. Batches are created from readily available messages so they incur minimal extra latency.
XA transactions are implemented incorrectly in some brokers. Specifically a prepared transaction is sometimes rolled back when the client disconnects. The issue is tricky because the integration will work during normal operation and the problem will only manifest if the job crashes in a specific moment. Hazelcast will even not detect it, only some messages will be missing from the sink. To test your broker we provide a tool, please go to XA tests to get more information. This only applies to JMS sink, the source doesn’t use XA transactions. |