Hazelcast provides a set of testing utilities for validating streaming pipelines executed by the Jet engine. These utilities allow you to simulate input data and assert on the output of distributed dataflows.
For dependencies and setup, refer to: Testing setup.
Test pipelines
JetTestSupport
(JUnit4)
Extend JetTestSupport
to test pipelines with in-process Hazelcast. This utility class provides lifecycle management and assertion methods designed for streaming and batch dataflows.
public class MyPipelineTest extends JetTestSupport {
@Test
public void testSimplePipeline() {
Config config = new Config();
config.setJetConfig(new JetConfig().setEnabled(true));
HazelcastInstance instance = createHazelcastInstance(config);
JetService jet = instance.getJet();
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3))
.writeTo(Sinks.list("out"));
jet.newJob(p).join();
IList<Integer> result = instance.getList("out");
assertEquals(3, result.size());
}
}
JetTestSupport extends HazelcastTestSupport , so it inherits all Hazelcast cluster testing capabilities. It is only compatible with JUnit 4 directly, but its methods can be reused in other test frameworks by calling them directly without inheritance.
|
JUnit 5
Use TestHazelcastFactory
configured with Jet:
class MyJupiterPipelineTest {
@Test
void testSimplePipeline() {
TestHazelcastFactory factory = new TestHazelcastFactory();
Config config = new Config();
config.setJetConfig(new JetConfig().setEnabled(true));
HazelcastInstance instance = factory.newHazelcastInstance(config);
JetService jet = instance.getJet();
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3))
.writeTo(Sinks.list("out"));
jet.newJob(p).join();
IList<Integer> result = instance.getList("out");
assertEquals(3, result.size());
factory.shutdownAll();
}
}
Common behavior
To use Jet assertions or control streaming jobs:
-
Use
assertCollected(…)
orassertCollectedEventually(…)
. -
Use
assertJobStatusEventually(…)
to wait for a job to reach the desired state.
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3))
.apply(assertAnyOrder("unexpected", List.of(1, 2, 3)))
.writeTo(Sinks.logger());
Job job = createJetMember().getJet().newJob(p);
job.join();
You can also combine Jet pipelines with Hazelcast maps and stateful transforms using mapUsingIMap()
:
IMap<String, Customer> customers = instance.getMap("customers");
customers.put("c1", new Customer("c1", "Alice"));
customers.put("c2", new Customer("c2", "Bob"));
// Build and run the pipeline
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("c1", "c2"))
.mapUsingIMap("customers",
id -> id,
(id, customer) -> ((Customer) customer).name()
)
.writeTo(Sinks.list("enriched"));
jet.newJob(p).join();
IList<String> result = instance.getList("enriched");
assertEquals(2, result.size());
assertEquals("Alice", result.get(0));
assertEquals("Bob", result.get(1));
Test sources and sinks
Hazelcast includes several utilities for injecting test data into a pipeline and asserting on its results. These include both batch-oriented and streaming-oriented sources, along with in-pipeline assertion sinks.
Batch sources
Batch sources emit a fixed number of items and are non-distributed. These are useful for testing deterministic, bounded pipelines.
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3, 4))
.writeTo(Sinks.logger());
Streaming sources
Streaming sources emit unbounded data streams and are timestamped. These are also non-distributed and suitable for testing long-running or real-time pipelines.
int itemsPerSecond = 10;
Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(itemsPerSecond))
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
In-pipeline assertions
Hazelcast provides assertion sinks that can be attached to a pipeline via the apply()
method. These assertions run in parallel with the pipeline execution and validate intermediate or final results.
Assertions fall into two categories: batch assertions and streaming assertions.
Batch assertions
Batch assertions are used with bounded pipelines. They collect all items emitted from the source and evaluate the result once the job has completed.
Ordered assertion
Validates that items are received in the exact order specified.
pipeline.readFrom(TestSources.items(1, 2, 3, 4))
.apply(Assertions.assertOrdered("unexpected values", Arrays.asList(1, 2, 3, 4)))
.writeTo(Sinks.logger());
Unordered assertion
Validates that a set of items is received, regardless of order.
pipeline.readFrom(TestSources.items(4, 3, 2, 1))
.apply(Assertions.assertAnyOrder("unexpected values", Arrays.asList(1, 2, 3, 4)))
.writeTo(Sinks.logger());
Streaming assertions
Streaming assertions support pipelines that do not terminate. These assertions periodically check collected data and stop the job automatically when the condition is satisfied.
Collected eventually assertion
Collects output and applies the assertion repeatedly. If the assertion passes within the timeout, the job completes with AssertionCompletedException
. If not, it fails with AssertionError
.
pipeline.readFrom(TestSources.itemStream(10))
.withoutTimestamps()
.apply(Assertions.assertCollectedEventually(5, items ->
assertTrue("did not receive at least 20 items", items.size() > 20)));
This allows testing streaming pipelines without running them indefinitely.
Sinks
Use simple, in-memory sinks in tests so you can assert results without external dependencies. The following are the most useful when validating pipelines.
Sinks.list(…)
(capture all items)
Writes items to an IList
, preserving per-processor arrival order (no global ordering guarantee). Ideal for asserting on the full output of bounded pipelines.
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(1, 2, 3))
.writeTo(Sinks.list("out"));
Job job = instance.getJet().newJob(p);
job.join(); // bounded job completes
IList<Integer> out = instance.getList("out");
assertEquals(List.of(1, 2, 3), out);
Clear the list at the start of each test: instance.getList("out").clear(); .
|
Sinks.map(…)
(keyed “last-write-wins”)
Writes to an IMap
using key/value functions. Useful when you want the latest value per key, or to assert on deduplicated/enriched outputs.
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("c1","c2","c1"))
.map(s -> tuple2(s, Instant.now().toEpochMilli()))
.writeTo(Sinks.map("byId", Tuple2::f0, Tuple2::f1));
instance.getJet().newJob(p).join();
IMap<String, Long> byId = instance.getMap("byId");
assertTrue(byId.containsKey("c1"));
assertTrue(byId.containsKey("c2")); // last value per key retained
Because IMap is partitioned and parallel, don’t rely on write order across the cluster. Assert on contents, not ordering.
|
Sinks.logger()
(debug/trace)
Emits each item to the test logs. Handy during development or when combining with in-pipeline assertions for streaming jobs.
pipeline.readFrom(TestSources.items(1, 2, 3))
.writeTo(Sinks.logger()); // visible in test output
Sinks.noop()
(throughput/baseline)
Discards items while exercising the full pipeline. Useful to isolate source/transform performance from sink overhead.
pipeline.readFrom(TestSources.items(1, 2, 3))
.writeTo(Sinks.noop());
Sinks.observable("name")
(push to the test thread)
Publishes items to a named `Observable so the test can subscribe and assert incrementally—especially helpful for streaming jobs that don’t terminate.
Observable<Long> obs = instance.getJet().getObservable("obs");
List<Long> received = Collections.synchronizedList(new ArrayList<>());
obs.addObserver(received::add);
pipeline.readFrom(TestSources.itemStream(5))
.withoutTimestamps()
.map(TimestampedEntry::getValue)
.writeTo(Sinks.observable("obs"));
Job job = instance.getJet().newJob(pipeline);
// Await a condition, then cancel/complete as needed for the test
assertTrueEventually(() -> assertTrue(received.size() > 20));
job.cancel();
Pair Sinks.observable with streaming assertions (see Streaming assertions) to stop the job automatically once your condition is satisfied.
|
Fault tolerance & semantics (tests)
Hazelcast Jet guarantees at-least-once delivery for items: each record will be written to the sink at least once, but in some failure/restart scenarios it may be written more than once.
For example, if you write to `Sinks.list("out"), after a member failure and job restart, some items could appear twice in the list. This is correct behaviour according to at-least-once semantics.
Because duplicates are possible, asserting strict sequence equality (e.g., [1,2,3]) may fail even though the pipeline is correct. Instead, design assertions that are idempotent—not affected by duplicates. Examples:
-
Assert the set of values (
{1,2,3}
), not the exact list with ordering. -
Assert that at least N items were produced.
-
Assert that a map contains expected keys/values (since overwriting the same key twice doesn’t matter).
This avoids tests that fail intermittently due to harmless duplicates.
For custom pass/fail logic that should end the job when satisfied, use the Assertion sink builder below rather than implementing your own processor.
If you want a test to stop automatically when a condition is met (e.g., “we’ve seen 100 items, so the job is successful”), you shouldn’t write your own processor.
Instead, use the Assertion Sink Builder API (Assertions.assertionSink(…)
) documented in the next paragraph. It is designed for:
-
Collecting items in-memory during the job.
-
Running your condition either continuously (for streaming) or at completion (for batch).
-
Throwing the correct exception (AssertionCompletedException or AssertionError) to end the job cleanly.
Assertion sink builder
For advanced use cases, a lower-level API allows building custom assertion sinks using the assertionSink(…)
builder.
Sink<MyType> sink = Assertions.<List<MyType>, MyType>assertionSink("my-assertion",
ArrayList::new)
.receiveFn(List::add)
.completeFn(items -> assertTrue(items.size() > 100))
.build();
Supported callbacks:
-
createFn
: Creates a container for collecting results. -
receiveFn
: Invoked for each item. -
timerFn
: Used in streaming jobs to periodically check the state. -
completeFn
: Called when the batch job completes.
These sinks do not support fault tolerance, but they comply with at-least-once delivery guarantees and can be made idempotent for exactly-once semantics.
Verify job state
In some cases, it’s necessary to wait until the job has started before verifying output. The assertJobStatusEventually
method can be used to check that a job reaches a specific state (e.g. `RUNNING). This is particularly useful when asserting on jobs that run in continuous ingestion mode or when coordinating multiple test threads.
public class DesiredStateTest extends JetTestSupport {
@Test
public void testJobReachesRunningState() {
HazelcastInstance hz = createHazelcastInstance();
Pipeline p = buildPipeline();
Job job = hz.getJet().newJob(p);
assertJobStatusEventually(job, JobStatus.RUNNING);
// proceed with test logic
}
}