Testing Jobs

Hazelcast project uses JUnit testing framework to test itself in various scenarios. Over the years there has been some repetition within the test implementations and it led us to come up with our own set of convenience methods for testing.

Hazelcast provides test support classes to verify correctness of your pipelines. Those support classes and the test sources are published to the Maven central repository for each version with the tests classifier.

To start using them, please add following dependencies to your project:

  • Gradle

  • Maven

compile 'com.hazelcast:hazelcast:5.3.6:tests'
<dependency>
  <groupId>com.hazelcast</groupId>
  <artifactId>hazelcast</artifactId>
  <version>5.3.6</version>
  <classifier>tests</classifier>
</dependency>

After adding the dependencies to your project, the test classes should be available to your project.

Creating Hazelcast members with Mock Network

Test utilities contains factory methods to create Hazelcast instances with the mock network stack. Those Hazelcast members communicate with each other using intra-process communication methods. This means we can create multiple lightweight Hazelcast members in our tests without using any networking resources.

To create Hazelcast members with mock network(along with a lot of convenience methods), you need to extend com.hazelcast.jet.core.JetTestSupport class.

Here is a simple test harness which creates a two-member Hazelcast cluster with mock network:

public class ClusteringTest extends JetTestSupport {

  @Test
  public void given_2nodeCluster_when...._then...() {
  // given
  HazelcastInstance instance1 = createHazelcastInstance();
  HazelcastInstance instance2 = createHazelcastInstance();
  // Alternatively
  // HazelcastInstance[] instances = createHazelcastInstances(2);

  // when
  ...
  ...
  // then
  ...
  ...
  }
...
}

If needed, a Config object can be passed to the factory methods like below:

public class ClusteringTest extends JetTestSupport {

  @Test
  public void given_2nodeClusterWith16CooperativeThreads_when...._then...() {
    // given
    Config config = new Config();
    config.getJetConfig().setCooperativeThreadCount(16);

    HazelcastInstance[] instances = createHazelcastInstances(config, 2);
    // when
    ...
    ...
    // then
    ...
    ...
  }
...
}

Similar to the Hazelcast members, you can create Hazelcast clients with the same factories. There is no need to provide any network configuration for clients to discover nodes since they are using the same factory. The discovery will work out of the box.

public class ClusteringTest extends JetTestSupport {

  @Test
  public void given_2nodeClusterAndClient_when...._then...() {
    // given
    HazelcastInstance[] instances = createHazelcastInstances(2);
    HazelcastInstance client = createHazelcastClient();
    // when
    ...
    ...
    // then
    ...
    ...
  }
...
}

When the above test run it should create 2 Hazelcast members and a Hazelcast client connected to them. When run, it can be verified that they form the cluster from the logs and client connected to them.

10:45:03.927 [ INFO] [c.h.i.c.ClusterService]
....
Members {size:2, ver:2} [
 Member [127.0.0.1]:5701 - 93328d97-0975-4dfa-bf56-4d46e8a469a1 this
 Member [127.0.0.1]:5702 - 920d1b0c-0898-4b6e-9009-8f29889d6a77
]

10:45:03.933 [ INFO] [c.h.i.c.ClusterService]

Members {size:2, ver:2} [
 Member [127.0.0.1]:5701 - 93328d97-0975-4dfa-bf56-4d46e8a469a1
 Member [127.0.0.1]:5702 - 920d1b0c-0898-4b6e-9009-8f29889d6a77 this
]
....
10:45:04.890 [ INFO] [c.h.c.i.s.ClientClusterService]

Members [2] {
 Member [127.0.0.1]:5701 - 93328d97-0975-4dfa-bf56-4d46e8a469a1
 Member [127.0.0.1]:5702 - 920d1b0c-0898-4b6e-9009-8f29889d6a77
}

First two blocks are the member list printed from each member’s point of view and the last one is the cluster from the client’s point of view.

So far, we’ve seen how to create any number of Hazelcast clusters and clients using factory methods provided within the com.hazelcast.jet.core.JetTestSupport class to create the test environments. Let’s explore other utilities to write meaningful test.

Integration Tests

For integration testing, there might be a need to create real instances without the mock network. For those cases you can create real instances with Hazelcast.newHazelcastInstance() method.

Using Random Cluster Names

If multiple tests are running in parallel there is a chance that the clusters in each test can discover others, interfere the test execution and most of the time causing both of them to fail.

To avoid such scenarios, you need to isolate the clusters in each test execution by giving them unique cluster names. This way, they won’t try to connect each other since the nodes will only try to connect to other members with the same cluster name property.

Random cluster name for each test execution can be generated like below:

public class ClusteringTest extends JetTestSupport {

  @Test
  public void given_2nodeClusterAndClient_when..._then...() {
    // given
    String clusterName = randomName();
    Config config = new Config();
    config.setClusterName(clusterName);
    HazelcastInstance[] instances = createHazelcastInstances(config, 2);

    ClientConfig clientConfig = new ClientConfig();
    clientConfig.setClusterName(clusterName);
    HazelcastInstance client = createHazelcastClient(clientConfig);
    // when
    ...
    ...
    // then
    ...
    ...
  }
...
}

In the example above randomName() utility method has been used to generate a random string from com.hazelcast.jet.core.JetTestSupport class.

Cleaning up the Resources

Mock instances created from the factory of com.hazelcast.jet.core.JetTestSupport are cleaned-up automatically after the test execution has been finished.

If the test contains real instances, then they either needs to be tracked individually and shut down when the test finished or you can write a teardown method like below to shut down all instances created.

@After
public void after() {
    Hazelcast.shutdownAll();
}

Either way you have to shut down Hazelcast members after the test has been finished to reclaim resources and not to leave a room for interference with the next test execution due to distributed nature of the product.

Test Sources and Sinks

Hazelcast comes with batch and streaming test sources along with a assertion sinks where you can write tests to assert the output of a pipeline without having to write boilerplate code.

Test sources allow you to generate events for your pipeline.

Batch Source

These sources create a fixed amount of data. These sources are non-distributed.

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3, 4))
 .writeTo(Sinks.logger());

This will yield an output like below:

12:33:01.780 [ INFO] [c.h.j.i.c.W.loggerSink#0] 1
12:33:01.780 [ INFO] [c.h.j.i.c.W.loggerSink#0] 2
12:33:01.780 [ INFO] [c.h.j.i.c.W.loggerSink#0] 3
12:33:01.780 [ INFO] [c.h.j.i.c.W.loggerSink#0] 4

Streaming Source

Streaming sources create an infinite stream of data. The generated events have timestamps and like the batch source, this source is also non-distributed.

int itemsPerSecond = 10;
pipeline.readFrom(TestSources.itemStream(itemsPerSecond))
  .withNativeTimestamps(0)
  .writeTo(Sinks.logger());

The source above will emit data as follows:

12:33:36.774 [ INFO] [c.h.j.i.c.W.loggerSink#0] SimpleEvent(timestamp=12:33:36.700, sequence=0)
12:33:36.877 [ INFO] [c.h.j.i.c.W.loggerSink#0] SimpleEvent(timestamp=12:33:36.800, sequence=1)
12:33:36.976 [ INFO] [c.h.j.i.c.W.loggerSink#0] SimpleEvent(timestamp=12:33:36.900, sequence=2)
12:33:37.074 [ INFO] [c.h.j.i.c.W.loggerSink#0] SimpleEvent(timestamp=12:33:37.000, sequence=3)
12:33:37.175 [ INFO] [c.h.j.i.c.W.loggerSink#0] SimpleEvent(timestamp=12:33:37.100, sequence=4)
12:33:37.274 [ INFO] [c.h.j.i.c.W.loggerSink#0] SimpleEvent(timestamp=12:33:37.200, sequence=5)

Assertions

Hazelcast contains several sinks to support asserting directly in the pipeline. Furthermore, there’s additional convenience to have the assertions done inline with the sink without having to terminate the pipeline, using the apply() operator.

Batch Assertions

Batch assertions collect all incoming items, and perform assertions on the collected list after all the items are received. If the assertion passes, then no exception is thrown. If the assertion fails, then the job will fail with an AssertionError.

Ordered Assertion

This asserts that items have been received in a certain order and no other items have been received. Only applicable to batch jobs.

pipeline.readFrom(TestSources.items(1, 2, 3, 4))
  .apply(Assertions.assertOrdered("unexpected values", Arrays.asList(1, 2, 3, 4)))
  .writeTo(Sinks.logger())

Unordered Assertion

Asserts that items have been received in any order and no other items have been received. Only applicable to batch stages.

pipeline.readFrom(TestSources.items(4, 3, 2, 1))
  .apply(Assertions.assertAnyOrder("unexpected values", Arrays.asList(1, 2, 3, 4)))
  .writeTo(Sinks.logger())

Contains Assertions

Assert that the given items have been received in any order; receiving other, unrelated items does not affect this assertion. Only applicable to batch stages.

pipeline.readFrom(TestSources.items(4, 3, 2, 1))
  .apply(Assertions.assertContains(Arrays.asList(1, 3)))
  .writeTo(Sinks.logger())

Collected Assertion

This is a more flexible assertion which is only responsible for collecting the received items, and passes the asserting responsibility to the user. It is a building block for the other assertions. Only applicable to batch stages.

pipeline.readFrom(TestSources.items(1, 2, 3, 4))
        .apply(Assertions.assertCollected(items -> assertTrue("expected minimum of 4 items", items.size() >= 4)))
        .writeTo(Sinks.logger())

Streaming Assertions

For streaming assertions, it’s not possible to assert after all items have been received, as the stream never terminates. Instead, we periodically assert and throw if the assertion is not valid after a given period of time. However even if the assertion passes, we don’t want to the job to continue running forever. Instead a special exception AssertionCompletedException is thrown to signal the assertion has passed successfully.

Collected Eventually Assertion

This assertion collects incoming items and runs the given assertion function repeatedly on the received item set. If the assertion passes at any point, the job will be completed with an AssertionCompletedException. If the assertion fails after the given timeout period, the job will fail with an AssertionError.

pipeline.readFrom(TestSources.itemStream(10))
        .withoutTimestamps()
        .apply(assertCollectedEventually(5, c -> assertTrue("did not receive at least 20 items", c.size() > 20)))

The pipeline above with fail with an AssertionError if 20 items are not received after 5 seconds. The job will complete with an AssertionCompletedException as soon as 20 items or more are received.

Assertion Sink Builder

Both the batch and streaming assertions use an assertion sink builder for building the assertions. Although a lower-level API, this is also public and can be used to build other, more complex assertions if desired:

/**
 * Returns a builder object that offers a step-by-step fluent API to build
 * an assertion {@link Sink} for the Jet API. An assertion sink is
 * typically used for testing of pipelines where you can want to run
 * an assertion either on each item as they arrive, or when all items have been
 * received.
 * <p>
 * These are the callback functions you can provide to implement the sink's
 * behavior:
 * <ol><li>
 *     {@code createFn} creates the state which can be used to hold incoming
 *     items.
 * </li><li>
 *     {@code receiveFn} gets notified of each item the sink receives
 *     and can either assert the item directly or add it to the state
 *     object.
 * </li><li>
 *     {@code timerFn} is run periodically even when there are no items
 *     received. This can be used to assert that certain assertions have
 *     been reached within a specific period in streaming pipelines.
 * </li><li>
 *     {@code completeFn} is run after all the items have been received.
 *     This typically only applies only for batch jobs, in a streaming
 *     job this method may never be called.
 * </li></ol>
 * The returned sink will have a global parallelism of 1: all items will be
 * sent to the same instance of the sink.
 *
 * It doesn't participate in the fault-tolerance protocol,
 * which means you can't remember across a job restart which items you
 * already received. The sink will still receive each item at least once,
 * thus complying with the <em>at-least-once</em> processing guarantee. If
 * the sink is idempotent (suppresses duplicate items), it will also be
 * compatible with the <em>exactly-once</em> guarantee.
 *
 * @param <A> type of the state object
 *
 * @since 3.2
 */
@Nonnull
public static <A> AssertionSinkBuilder<A, Void> assertionSink(
        @Nonnull String name,
        @Nonnull SupplierEx<? extends A> createFn
) {
  ..
}

In addition to the assertions mentioned above, com.hazelcast.jet.core.JetTestSupport contains a lot of assertion methods which can be used to verify whether the job, member, or cluster is in a desired state.

Class Runners

There are multiple JUnit test class runners shipped with the tests package which gives various abilities.

The common features are:

  • Ability to print a thread dump in case of a test failure, configured via hazelcast.test.threadDumpOnFailure property

  • Supports repetitive test execution

  • Uses mock networking, unless configured to use real networking via hazelcast.test.use.network property

  • Disabled phone-home feature, configured via hazelcast.phone.home.enabled property

  • Have shorter(1 sec) wait time before joining than default(5 secs). This leads to faster cluster formation and test execution, configured via hazelcast.wait.seconds.before.join property.

  • Uses loopback address, configured via hazelcast.local.localAddress property

  • Uses IPv4 stack, configured via java.net.preferIPv4Stack property

  • Prints out test execution duration after they finish execution

Let’s have a look at them in detail:

Serial Class Runner

com.hazelcast.test.HazelcastSerialClassRunner is a JUnit test class runner which runs the tests in series. Nothing fancy, it just executes the tests with the features listed above.

Parallel Class Runner

com.hazelcast.test.HazelcastParallelClassRunner is a JUnit test class runner which runs the tests in parallel with multiple threads. If the test methods within the test class does not share any resources this yields a faster execution compared to it’s serial counterpart.

Repetitive Test Execution

While dealing with intermittently failing tests, it is helpful to run the test multiple times in series to increase chances to make it fail. In those cases com.hazelcast.test.annotation.Repeat annotation can be used to run the test repeatedly. @Repeat annotation can be used on both the class and method level. On the class level it repeats the whole class execution specified items. On the method level it only repeats particular test method.

Following is an example test which repeats the test method execution 5 times:

@RunWith(HazelcastSerialClassRunner.class)
public class RepetitiveTest extends JetTestSupport {

    @Repeat(5)
    @Test
    public void test() {
        System.out.println("Test method to be implemented!");
    }
}

When run, it logs like the following:

Started Running Test: test
---> Repeating test [RepetitiveTest:test], run count [1]
Test method to be implemented!
---> Repeating test [RepetitiveTest:test], run count [2]
Test method to be implemented!
---> Repeating test [RepetitiveTest:test], run count [3]
Test method to be implemented!
---> Repeating test [RepetitiveTest:test], run count [4]
Test method to be implemented!
---> Repeating test [RepetitiveTest:test], run count [5]
Test method to be implemented!
Finished Running Test: test in 0.009 seconds.

Note: @Repeat annotation only works with Hazelcast Class runners.

Waiting for a Job to be in a Desired State

For some use cases, you need to make sure the job is submitted and running on the cluster before generating any events on the controlled source to observe results. To achieve this, the following assertion could be used to validate that the job is in the desired state.

public class DesiredStateTest extends JetTestSupport {

    @Test
    public void given_singeNodeJet_when_jobIsRunning__then...() {
        // given
        HazelcastInstance hz = createHazelcastInstance();

        // when
        Pipeline p = buildPipeline();
        Job job = hz.getJet().newJob(p);
        assertJobStatusEventually(job, JobStatus.RUNNING);

        // then
        ...
        ...
    }
...
}

In the example above assertJobStatusEventually(Job, JobStatus) utility method has been used to validate the job is in the desired state from the com.hazelcast.jet.core.JetTestSupport class.