This is a prerelease version.

View latest

Acting Upon Changes to a Map

You can set up monitoring of a map to look for specific events, such as adding an entry, updating a specific entry, or deleting an entry. You can use these events to trigger execution of code, or even intercept the event and change it before it affects the data in memory.

Listeners

Hazelcast offers two methods for monitoring map activity. MapListener allows you to define actions that are triggered based on activity on any entry in the map. EventListener allows you to define actions based on activity taken on a specific map entry or set of map entries. You 'll use predicates to define the entries to be monitored for activity.

Listening to an Entire Map

If you want to listen to changes to all map entries, see Listening for Map Events for an explanation and example of the MapListener method.

Listening for Specific Map Entries using Predicates

The easiest way to explain this is through an example. We are going to listen for changes made to the map entry for an employee with the surname "smith". We will use JSON serialization. First, let’s create the Employee class that we will use to create HazelcastJsonValue instances:

  • Java

  • C++

  • Node.js

  • Python

  • Go

public class Employee {
    public static HazelcastJsonValue asJson(String surname) {
        return new HazelcastJsonValue("{ \"surname\": \"" + surname + "\" }");
    }
}
#include <hazelcast/client/hazelcast_client.h>

struct Employee {
    static hazelcast::client::hazelcast_json_value as_json(std::string surname){
        return hazelcast::client::hazelcast_json_value("{ \"surname\": \"" + surname + "\" }");
    }
};
class Employee {
    static asJson(surname) {
        return new HazelcastJsonValue(JSON.stringify({surname: surname}));
    }
}
def employee(surname):
    return HazelcastJsonValue({"surname": surname})
func employee(surname string) serialization.JSON {
	text := fmt.Sprintf(`{
		"surname": "%s"
	}`, surname)
	return serialization.JSON(text)
}

Then, let’s create a listener that tracks the ADDED, UPDATED and REMOVED entry events with the surname predicate.

  • Java

  • C++

  • Node.js

  • Python

  • Go

public class ListenerWithPredicate {

    public static void main(String[] args) {
        Config config = new Config();
        config.setProperty("hazelcast.map.entry.filtering.natural.event.types", "true");
        HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
        IMap<String, HazelcastJsonValue> map = hz.getMap("map");
        map.addEntryListener(new MyEntryListener(),
                Predicates.sql("surname=smith"), true);
        System.out.println("Entry Listener registered");
    }

    static class MyEntryListener
            implements EntryAddedListener<String, HazelcastJsonValue>,
            EntryUpdatedListener<String, HazelcastJsonValue>,
            EntryRemovedListener<String, HazelcastJsonValue> {
        @Override
        public void entryAdded(EntryEvent<String, HazelcastJsonValue> event) {
            System.out.println("Entry Added:" + event);
        }

        @Override
        public void entryRemoved(EntryEvent<String, HazelcastJsonValue> event) {
            System.out.println("Entry Removed:" + event);
        }

        @Override
        public void entryUpdated(EntryEvent<String, HazelcastJsonValue> event) {
            System.out.println("Entry Updated:" + event);
        }
    }
}
int main() {
    hazelcast::client::client_config config;
    auto hz = hazelcast::new_client(std::move(config)).get();
    auto map = hz.get_map("map").get();
    hazelcast::client::query::sql_predicate sqlPredicate(hz,"surname=smith");
    map->add_entry_listener(
            hazelcast::client::entry_listener().on_added([](hazelcast::client::entry_event &&event) {
                std::cout << "Entry Added:" << event << std::endl;
            }).on_removed([](hazelcast::client::entry_event &&event) {
                std::cout << "Entry Removed:" << event << std::endl;
            }).on_updated([](hazelcast::client::entry_event &&event) {
                std::cout << "Entry Updated:" << event  << std::endl;
            }), sqlPredicate, true).get();
    std::cout << "Entry Listener registered" << std::endl;
}
const { Client, Predicates } = require('hazelcast-client');

async function main() {
    const client = await Client.newHazelcastClient();
    const map = await client.getMap('map');
    const listener = {
        added: (entryEvent) => {
            console.log(`Entry added: ${entryEvent.key}, ${entryEvent.value}`);
        },
        removed: (entryEvent) => {
            console.log(`Entry removed: ${entryEvent.key}, ${entryEvent.value}`);
        },
        updated: (entryEvent) => {
            console.log(`Entry updated: ${entryEvent.key}, ${entryEvent.value}`);
        }
    };
    await map.addEntryListenerWithPredicate(listener, new Predicates.sql('surname=smith'), undefined, true);
    console.log('Entry listener registered');
}

main().catch(err => {
    console.error('Error occurred:', err);
    process.exit(1);
});
def entry_added(event):
	print(f"Entry added with key: {event.key}, value: {event.value}")

def entry_removed(event):
	print(f"Entry removed with key: {event.key}")

def entry_updated(event):
	print(f"Entry updated with key: {event.key}, old value: {event.old_value}, new value: {event.value}")

client = hazelcast.HazelcastClient()

map = client.get_map("map").blocking()

map.add_entry_listener(
    predicate=sql("surname = smith"),
    added_func=entry_added,
    removed_func=entry_removed,
    updated_func=entry_updated
)
print("Entry listener registered")
func main() {
	// error handling is omitted for brevity
	ctx := context.TODO()
	client, _ := hazelcast.StartNewClient(ctx)
	m, _ := client.GetMap(ctx, "map")
	subscriptionID, _ := m.AddListenerWithPredicate(ctx, hazelcast.MapListener{
		EntryAdded: func(event *hazelcast.EntryNotified) {
			fmt.Println("Entry Added:", event.Key)
		},
		EntryUpdated: func(event *hazelcast.EntryNotified) {
			fmt.Println("Entry Updated:", event.Key)
		},
		EntryRemoved: func(event *hazelcast.EntryNotified) {
			fmt.Println("Entry Removed:", event.Key)
		},
	}, predicate.Equal("surname", "smith"), true)
See How Distributed Query Works for more information about creating predicates.

When the listener is running, a change to any record with the surname smith will display output similar to the one below.

Entry Added:EntryEvent{entryEventType=ADDED, member=Member [192.168.1.227]:5701 - 36e6eaf5-e267-4858-a5ea-6adc3be2f6ff this, name='map', key=1, oldValue=null, value={ "surname": "smith" }, mergingValue=null}

In our example, we are sending an event log to the system output. You can replace this action with any code you want to run when the specified map event occurs.

The default backwards-compatible event publishing strategy only publishes UPDATED events when map entries are updated to a value that matches the predicate with which the listener was registered. This implies that when using the default event publishing strategy, your listener is not notified about an entry whose value is updated from one that matches the predicate to a new value that does not match the predicate.

When you configure Hazelcast members with property hazelcast.map.entry.filtering.natural.event.types set to true, handling of entry updates conceptually treats value transition as entry, update or exit with regards to the predicate value space. The following table compares how a listener is notified about an update to a map entry value under the default backwards-compatible Hazelcast behavior (when property hazelcast.map.entry.filtering.natural.event.types is not set or is set to false) versus when set to true:

hazelcast.map.entry.filtering.natural.event.types = false (default)

hazelcast.map.entry.filtering.natural.event.types = true

When old value matches predicate, new value does not match predicate

No event is delivered to entry listener

REMOVED event is delivered to entry listener

When old value matches predicate, new value matches predicate

UPDATED event is delivered to entry listener

UPDATED event is delivered to entry listener

When old value does not match predicate, new value does not match predicate

No event is delivered to entry listener

No event is delivered to entry listener

When old value does not match predicate, new value matches predicate

UPDATED event is delivered to entry listener

ADDED event is delivered to entry listener

Interceptors

Unlike listeners, interceptors can change the action taken on a map before it is completed. With listeners, you take an action after a method has been completed and the in-memory map has been modified. Interceptor actions are synchronous, allowing you to alter the behavior of a method, change its values, or totally cancel it.

Interceptors are a server-side feature. Because these operations run within the Hazelcast cluster natively, the interfaces that perform these functions are only available in Java.

Map interceptors are chained, so adding the same interceptor to the same map more than once results in duplicated effects. This can easily happen when the interceptor is added to the map at member initialization, so that each member adds the same interceptor.

When you add the interceptor in this way, be sure to implement the hashCode() method to return the same value for every instance of the interceptor. It is not strictly necessary, but it is a good idea to also implement equals() as this ensures that the map interceptor can be removed reliably.

The map API has two methods for adding and removing an interceptor to the map: addInterceptor and removeInterceptor. See also the MapInterceptor interface to learn about the methods used to intercept the changes in a map.

Methods available within the MapInterceptor interface:

interceptGet

Replace returned map.get() value with new value

afterGet

Action to take after map.get() operation is complete

interceptPut

Replace value in map.put() with new value

afterPut

Action to take after map.put() is complete

interceptRemove

Collects removed map entry

afterRemove

Action to take after map.remove() is complete

The following is an example usage.

public class MapInterceptorMember {

    public static void main(String[] args) {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IMap<String, String> map = hz.getMap("themap");
        map.addInterceptor(new MyMapInterceptor());

        map.put("1", "1");
        System.out.println(map.get("1"));
    }

    private static class MyMapInterceptor implements MapInterceptor {

        @Override
        public Object interceptGet(Object value) {
            return value + "-foo";
        }

        @Override
        public void afterGet(Object value) {
        }

        @Override
        public Object interceptPut(Object oldValue, Object newValue) {
            return null;
        }

        @Override
        public void afterPut(Object value) {
        }

        @Override
        public Object interceptRemove(Object removedValue) {
            return null;
        }

        @Override
        public void afterRemove(Object value) {
        }
    }
}