A newer version of Platform is available.

View latest

Testing Sources and Sinks for XA Transaction Support

In order to have end-to-end exactly-once processing in pipelines, sources and sinks need to support it too. Although many sources and sinks ensure atomicity (all distributed participants either commit the transaction or roll back), some do not survive a client disconnection. You can test whether a source or a sink will survive a client disconnection, using either JDBC or Java Messaging Service (JMS).

Hazelcast starts transactions for the source, sink, and any stage between these two, prepares them and commits them only after a snapshot of the job is taken on all members.

Running Tests

To make sure that a prepared transaction can be committed after a client reconnects, you can use one of the following tests:

  • JDBC Test

  • JMS Test

    1. Clone the contrib repo

      git clone git@github.com:hazelcast/hazelcast-jet-contrib.git
    2. Head to xa-tests module.

      cd hazelcast-jet-contrib/xa-tests
    3. Run a test.

      • JDBC

      • JMS

      This example tests compatibility for PostgreSQL.

      1. Add the database connector dependency to build.gradle:

          compile group: 'org.postgresql', name: 'postgresql', version: '42.2.9'
      2. Edit JdbcXaTest.java

        Create a PGXADataSource as the XA connection factory and configure it with the URL of the database, username, password and database name:

        private static XADataSource getXADataSource() {
            PGXADataSource factory = new PGXADataSource();
            return factory;
      3. Run the class, you should see "Success!" in the output.

      This example tests compatibility for a JMS broker.

      1. Add the dependency to build.gradle:

          compile group: 'org.apache.activemq', name:'activemq-all', version:'5.15.11'
      2. Edit JmsXaTest.java

        Create an ActiveMQXAConnectionFactory with the broker URL as the XA connection factory.

        private static XAConnectionFactory getXAConnectionFactory() {
            ActiveMQXAConnectionFactory factory = new ActiveMQXAConnectionFactory("broker:(tcp://localhost:61616)");
            return factory;
      3. Run the class, you should see "Success!" in the output.