Acting Upon Changes to a Map
You can set up monitoring of a map to look for specific events, such as adding an entry, updating a specific entry, or deleting an entry. You can use these events to trigger execution of code, or even intercept the event and change it before it affects the data in memory.
Listeners
Hazelcast offers two methods for monitoring map activity. MapListener
allows you to define actions that are triggered based on activity on any entry in the map. EventListener
allows you to define actions based on activity taken on a specific map entry or set of map entries. You 'll use predicates to define the entries to be monitored for activity.
Listening to an Entire Map
If you want to listen to changes to all map entries, see Listening for Map Events for an explanation and example of the MapListener
method.
Listening for Specific Map Entries using Predicates
The easiest way to explain this is through an example. We are going to listen for changes made to the map entry for an employee with the surname "smith".
We will use JSON serialization. First, let’s create the Employee
class that we will use to create HazelcastJsonValue
instances:
public class Employee {
public static HazelcastJsonValue asJson(String surname) {
return new HazelcastJsonValue("{ \"surname\": \"" + surname + "\" }");
}
}
#include <hazelcast/client/hazelcast_client.h>
struct Employee {
static hazelcast::client::hazelcast_json_value as_json(std::string surname){
return hazelcast::client::hazelcast_json_value("{ \"surname\": \"" + surname + "\" }");
}
};
class Employee {
static asJson(surname) {
return new HazelcastJsonValue(JSON.stringify({surname: surname}));
}
}
def employee(surname):
return HazelcastJsonValue({"surname": surname})
func employee(surname string) serialization.JSON {
text := fmt.Sprintf(`{
"surname": "%s"
}`, surname)
return serialization.JSON(text)
}
Then, let’s create a listener that tracks the ADDED
, UPDATED
and REMOVED
entry events with the surname
predicate.
public class ListenerWithPredicate {
public static void main(String[] args) {
Config config = new Config();
config.setProperty("hazelcast.map.entry.filtering.natural.event.types", "true");
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
IMap<String, HazelcastJsonValue> map = hz.getMap("map");
map.addEntryListener(new MyEntryListener(),
Predicates.sql("surname=smith"), true);
System.out.println("Entry Listener registered");
}
static class MyEntryListener
implements EntryAddedListener<String, HazelcastJsonValue>,
EntryUpdatedListener<String, HazelcastJsonValue>,
EntryRemovedListener<String, HazelcastJsonValue> {
@Override
public void entryAdded(EntryEvent<String, HazelcastJsonValue> event) {
System.out.println("Entry Added:" + event);
}
@Override
public void entryRemoved(EntryEvent<String, HazelcastJsonValue> event) {
System.out.println("Entry Removed:" + event);
}
@Override
public void entryUpdated(EntryEvent<String, HazelcastJsonValue> event) {
System.out.println("Entry Updated:" + event);
}
}
}
int main() {
hazelcast::client::client_config config;
auto hz = hazelcast::new_client(std::move(config)).get();
auto map = hz.get_map("map").get();
hazelcast::client::query::sql_predicate sqlPredicate(hz,"surname=smith");
map->add_entry_listener(
hazelcast::client::entry_listener().on_added([](hazelcast::client::entry_event &&event) {
std::cout << "Entry Added:" << event << std::endl;
}).on_removed([](hazelcast::client::entry_event &&event) {
std::cout << "Entry Removed:" << event << std::endl;
}).on_updated([](hazelcast::client::entry_event &&event) {
std::cout << "Entry Updated:" << event << std::endl;
}), sqlPredicate, true).get();
std::cout << "Entry Listener registered" << std::endl;
}
const { Client, Predicates } = require('hazelcast-client');
async function main() {
const client = await Client.newHazelcastClient();
const map = await client.getMap('map');
const listener = {
added: (entryEvent) => {
console.log(`Entry added: ${entryEvent.key}, ${entryEvent.value}`);
},
removed: (entryEvent) => {
console.log(`Entry removed: ${entryEvent.key}, ${entryEvent.value}`);
},
updated: (entryEvent) => {
console.log(`Entry updated: ${entryEvent.key}, ${entryEvent.value}`);
}
};
await map.addEntryListenerWithPredicate(listener, new Predicates.sql('surname=smith'), undefined, true);
console.log('Entry listener registered');
}
main().catch(err => {
console.error('Error occurred:', err);
process.exit(1);
});
def entry_added(event):
print(f"Entry added with key: {event.key}, value: {event.value}")
def entry_removed(event):
print(f"Entry removed with key: {event.key}")
def entry_updated(event):
print(f"Entry updated with key: {event.key}, old value: {event.old_value}, new value: {event.value}")
client = hazelcast.HazelcastClient()
map = client.get_map("map").blocking()
map.add_entry_listener(
predicate=sql("surname = smith"),
added_func=entry_added,
removed_func=entry_removed,
updated_func=entry_updated
)
print("Entry listener registered")
func main() {
// error handling is omitted for brevity
ctx := context.TODO()
client, _ := hazelcast.StartNewClient(ctx)
m, _ := client.GetMap(ctx, "map")
subscriptionID, _ := m.AddListenerWithPredicate(ctx, hazelcast.MapListener{
EntryAdded: func(event *hazelcast.EntryNotified) {
fmt.Println("Entry Added:", event.Key)
},
EntryUpdated: func(event *hazelcast.EntryNotified) {
fmt.Println("Entry Updated:", event.Key)
},
EntryRemoved: func(event *hazelcast.EntryNotified) {
fmt.Println("Entry Removed:", event.Key)
},
}, predicate.Equal("surname", "smith"), true)
See How Distributed Query Works for more information about creating predicates. |
When the listener is running, a change to any record with the surname smith
will display output similar to the one below.
Entry Added:EntryEvent{entryEventType=ADDED, member=Member [192.168.1.227]:5701 - 36e6eaf5-e267-4858-a5ea-6adc3be2f6ff this, name='map', key=1, oldValue=null, value={ "surname": "smith" }, mergingValue=null}
In our example, we are sending an event log to the system output. You can replace this action with any code you want to run when the specified map event occurs.
The default backwards-compatible event publishing strategy only publishes UPDATED events when map entries are updated to a value that matches the predicate with which the listener was registered. This implies that when using the default event publishing strategy, your listener is not notified about an entry whose value is updated from one that matches the predicate to a new value that does not match the predicate.
|
When you configure Hazelcast members with property hazelcast.map.entry.filtering.natural.event.types
set to true
, handling of entry updates conceptually treats value transition as entry, update or exit with regards to the predicate value space. The following table compares how a listener is notified about an update to a map entry value under the default backwards-compatible Hazelcast behavior (when property hazelcast.map.entry.filtering.natural.event.types
is not set or is set to false
) versus when set to true
:
|
|
|
When old value matches predicate, new value does not match predicate |
No event is delivered to entry listener |
|
When old value matches predicate, new value matches predicate |
|
|
When old value does not match predicate, new value does not match predicate |
No event is delivered to entry listener |
No event is delivered to entry listener |
When old value does not match predicate, new value matches predicate |
|
|
Interceptors
Unlike listeners, interceptors can change the action taken on a map before it is completed. With listeners, you take an action after a method has been completed and the in-memory map has been modified. Interceptor actions are synchronous, allowing you to alter the behavior of a method, change its values, or totally cancel it.
Interceptors are a server-side feature. Because these operations run within the Hazelcast cluster natively, the interfaces that perform these functions are only available in Java. |
Map interceptors are chained, so adding the same interceptor to the same map more than once results in duplicated effects. This can easily happen when the interceptor is added to the map at member initialization, so that each member adds the same interceptor.
When you add the interceptor in this way, be sure to implement the hashCode()
method to return the same value for every instance of the interceptor.
It is not strictly necessary, but it is a good idea to also implement equals()
as this ensures that the map interceptor can be removed reliably.
The map API has two methods for adding and removing an interceptor to the map:
addInterceptor
and removeInterceptor
. See also the
MapInterceptor
interface
to learn about the methods used to intercept the changes in a map.
Methods available within the MapInterceptor
interface:
|
Replace returned |
|
Action to take after |
|
Replace value in |
|
Action to take after |
|
Collects removed map entry |
|
Action to take after |
The following is an example usage.
public class MapInterceptorMember {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
IMap<String, String> map = hz.getMap("themap");
map.addInterceptor(new MyMapInterceptor());
map.put("1", "1");
System.out.println(map.get("1"));
}
private static class MyMapInterceptor implements MapInterceptor {
@Override
public Object interceptGet(Object value) {
return value + "-foo";
}
@Override
public void afterGet(Object value) {
}
@Override
public Object interceptPut(Object oldValue, Object newValue) {
return null;
}
@Override
public void afterPut(Object value) {
}
@Override
public Object interceptRemove(Object removedValue) {
return null;
}
@Override
public void afterRemove(Object value) {
}
}
}