A newer version of Hazelcast Platform is available.

View latest

JMS Message Queue Connector

JMS (Java Message Service) is a standard API for communicating with various message brokers using the queue or publish-subscribe patterns.

There are several brokers that implement the JMS standard, including:

  • Apache ActiveMQ and ActiveMQ Artemis

  • Amazon SQS

  • IBM MQ

  • RabbitMQ

  • Solace

Hazelcast is able to utilize these brokers both as a source and a sink through the use of the JMS API.

Installing the Connector

This connector is included in the full and slim distributions of Hazelcast.

To use a JMS broker, such as ActiveMQ, you need to add the client libraries and all their transitive dependencies to the classpath of your members. If the broker provides a fat client JAR, you may find it easier to use that instead.

Connection Handling

The JMS source and sink open one connection to the JMS server for each member and each vertex. Then each parallel worker of the source creates a session and a message consumer/producer using that connection.

IO failures are generally handled by the JMS client and do not cause the connector to fail. Most of the clients offer a configuration parameter to enable auto-reconnection, refer to your client’s documentation for details.

JMS as a Source

A very simple pipeline which consumes messages from a given ActiveMQ queue and then logs them is given below:

Pipeline p = Pipeline.create();
p.readFrom(Sources.jmsQueue("queueName",
        () -> new ActiveMQConnectionFactory("tcp://localhost:61616")))
 .withoutTimestamps()
 .writeTo(Sinks.logger());

For a topic you can choose whether the consumer is durable or shared. You need to use the consumerFn to create the desired consumer using a JMS Session object.

If you create a shared consumer, you need to let Hazelcast know by calling sharedConsumer(true) on the builder. If you don’t do this, only one cluster member will actually connect to the JMS broker and will receive all the messages. We always assume a shared consumer for queues.

If you create a non-durable consumer, the fault-tolerance features won’t work since the JMS broker won’t track which messages were delivered to the client and which not.

Below is a simple example to create a non-durable non-shared topic source:

Pipeline p = Pipeline.create();
p.readFrom(Sources.jmsTopic("topic",
        () -> new ActiveMQConnectionFactory("tcp://localhost:61616")))
 .withoutTimestamps()
 .writeTo(Sinks.logger());

Here is a more complex example that uses a shared, durable consumer:

Pipeline p = Pipeline.create();
p.readFrom(Sources
        .jmsTopicBuilder(() ->
                new ActiveMQConnectionFactory("tcp://localhost:61616"))
        .sharedConsumer(true)
        .consumerFn(session -> {
            Topic topic = session.createTopic("topic");
            return session.createSharedDurableConsumer(topic, "consumer-name");
        })
        .build())
 .withoutTimestamps()
 .writeTo(Sinks.logger());

Fault Tolerance

The source connector is fault-tolerant with the exactly-once guarantee (except for the non-durable topic consumer). Fault tolerance is achieved by acknowledging the consumed messages only after they were fully processed by the downstream stages. Acknowledging is done once per snapshot, you need to enable the processing guarantee in the JobConfig.

In the exactly-once mode the processor saves the IDs of the messages processed since the last snapshot into the snapshotted state. Therefore this mode will not work if your messages don’t have the JMS Message ID set (it is an optional feature of JMS). In this case you need to set messageIdFn on the builder to extract the message ID from the payload. If you don’t have a message ID to use, you must reduce the source guarantee to at-least-once:

p.readFrom(Sources.jmsTopicBuilder(...)
  .maxGuarantee(ProcessingGuarantee.AT_LEAST_ONCE)
  ...)

In the at-least-once mode messages are acknowledged in the same way as in the exactly-once mode, but message IDs are not saved to the snapshot.

If you have no processing guarantee enabled, the processor will consume the messages in the DUPS_OK_ACKNOWLEDGE mode.

JMS as a Sink

The JMS sink uses the supplied function to create a Message object for each input item. The following code snippets show writing to a JMS queue and a JMS topic using the ActiveMQ JMS client.

Pipeline p = Pipeline.create();
p.readFrom(Sources.list("inputList"))
 .writeTo(Sinks.jmsQueue("queue",
         () -> new ActiveMQConnectionFactory("tcp://localhost:61616"))
 );
Pipeline p = Pipeline.create();
p.readFrom(Sources.list("inputList"))
 .writeTo(Sinks.jmsTopic("topic",
        () -> new ActiveMQConnectionFactory("tcp://localhost:61616"))
 );

Fault Tolerance

The JMS sink supports the exactly-once guarantee. It uses two-phase XA transactions, messages are committed consistent with the last state snapshot. This greatly increases the latency, it is determined by the snapshot interval: messages are visible to consumers only after the commit. In order to make it work, the connection factory you provide has to implement javax.jms.XAConnectionFactory, otherwise the job will not start.

If you want to avoid the higher latency, decrease the overhead introduced by the XA transactions, if your JMS implementation doesn’t support XA transactions or if you just don’t need the guarantee, you can reduce it just for the sink:

stage.writeTo(Sinks
         .jmsQueueBuilder(() -> new ActiveMQConnectionFactory("tcp://localhost:61616"))
         // decrease the guarantee for the sink
         .exactlyOnce(false)
         .build());

In the at-least-once mode or if no guarantee is enabled, the transaction is committed after each batch of messages: transactions are used for performance as this is JMS' way to send messages in batches. Batches are created from readily available messages so they incur minimal extra latency.

XA transactions are implemented incorrectly in some brokers. Specifically a prepared transaction is sometimes rolled back when the client disconnects. The issue is tricky because the integration will work during normal operation and the problem will only manifest if the job crashes in a specific moment. Hazelcast will even not detect it, only some messages will be missing from the sink. To test your broker we provide a tool, please go to XA tests to get more information. This only applies to JMS sink, the source doesn’t use XA transactions.