A newer version of Hazelcast Platform is available.

View latest

Subscribe to Changes to a Map

Hazelcast can use distributed maps as both a data source and a data sink. It can serve as both a batch source, giving you all the current data, and as a streaming source, giving you a stream of change events.

Here we focus on using a map as a streaming source.

Step 1. Enable Event Journal in Configuration

To capture the change stream of an IMap you must enable its event journal. Add this to your YAML config:

  map:
    streamed-map:
      event-journal:
        enabled: true

Step 2. Write the Producer

First let’s write some code that will be updating our IMap. Put this into your project:

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;

import java.util.concurrent.ThreadLocalRandom;

public class Producer {
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();
        pipeline.readFrom(TestSources.itemStream(100,
                (ts, seq) -> ThreadLocalRandom.current().nextLong(0, 1000)))
                .withoutTimestamps()
                .map(l -> Util.entry(l % 5, l))
                .writeTo(Sinks.map("streamed-map"));

        JobConfig cfg = new JobConfig().setName("producer");
        HazelcastInstance hz = Hazelcast.bootstrappedInstance();
        hz.getJet().newJob(pipeline, cfg);
    }
}

Step 3. Write the Consumer

Now we are ready to write the pipeline that consumes and processes the change events. It will tell us for each map key how many times per second it’s being updated.

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.WindowDefinition;

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class Consumer {

  public static void main(String[] args) {
    Pipeline pipeline = Pipeline.create();
    pipeline.readFrom(Sources.<Long, Long>mapJournal("streamed-map",
      JournalInitialPosition.START_FROM_CURRENT))
      .withIngestionTimestamps()
      .window(WindowDefinition.tumbling(TimeUnit.SECONDS.toMillis(1)))
      .groupingKey(Map.Entry::getKey)
      .aggregate(AggregateOperations.counting())
      .map(r -> String.format("Key %d had %d updates", r.getKey(), r.getValue()))
      .writeTo(Sinks.logger());

    JobConfig cfg = new JobConfig().setName("consumer");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(pipeline, cfg);
  }
}

Step 4. Submit for Execution

Assuming you already started a Hazelcast member, use hazelcast submit to first start the producing job:

<path_to_jet>/bin/hz-cli submit --class org.example.Producer <path_to_your_jar>

Then start the consuming job:

<path_to_jet>/bin/hz-cli submit --class org.example.Consumer <path_to_your_jar>

You should start seeing output like this in the Hazelcast member’s log:

...
2020-03-06 10:17:21,025 ... Key 2 had 24 updates
2020-03-06 10:17:21,025 ... Key 1 had 19 updates
2020-03-06 10:17:21,025 ... Key 0 had 17 updates
2020-03-06 10:17:21,025 ... Key 3 had 14 updates
2020-03-06 10:17:21,025 ... Key 4 had 26 updates
...

Step 5. Clean Up

Cancel your jobs when you’re done:

<path_to_jet>/bin/hz-cli cancel consumer
<path_to_jet>/bin/hz-cli cancel producer