Testing Sources and Sinks for XA Transaction Support
In order to have end-to-end exactly-once processing in pipelines, sources and sinks need to support it too. Although many sources and sinks ensure atomicity (all distributed participants either commit the transaction or roll back), some do not survive a client disconnection. You can test whether a source or a sink will survive a client disconnection, using either JDBC or Java Messaging Service (JMS).
Hazelcast starts transactions for the source, sink, and any stage between these two, prepares them and commits them only after a snapshot of the job is taken on all members. |
Running Tests
To make sure that a prepared transaction can be committed after a client reconnects, you can use one of the following tests:
-
-
Clone the contrib repo
git clone git@github.com:hazelcast/hazelcast-jet-contrib.git
-
Head to
xa-tests
module.cd hazelcast-jet-contrib/xa-tests
-
Run a test.
This example tests compatibility for
PostgreSQL
.-
Add the database connector dependency to
build.gradle
:compile group: 'org.postgresql', name: 'postgresql', version: '42.2.9'
-
Edit
JdbcXaTest.java
Create a
PGXADataSource
as the XA connection factory and configure it with the URL of the database, username, password and database name:... private static XADataSource getXADataSource() { PGXADataSource factory = new PGXADataSource(); factory.setUrl("jdbc:postgresql://localhost:32773/test-database"); factory.setUser("the-user"); factory.setPassword("the-pass"); factory.setDatabaseName("test-database"); return factory; } ...
-
Run the class, you should see
"Success!"
in the output.
This example tests compatibility for a JMS broker.
-
Add the dependency to
build.gradle
:compile group: 'org.apache.activemq', name:'activemq-all', version:'5.15.11'
-
Edit
JmsXaTest.java
Create an
ActiveMQXAConnectionFactory
with the broker URL as the XA connection factory.... private static XAConnectionFactory getXAConnectionFactory() { ActiveMQXAConnectionFactory factory = new ActiveMQXAConnectionFactory("broker:(tcp://localhost:61616)"); return factory; } ...
-
Run the class, you should see
"Success!"
in the output.
-
-