Implementing a Custom MapStore
You can use the Java MapStore
and MapLoader
interfaces to implement a custom MapStore with your own logic such as for database connections, loading data from a data store, and writing data to a data store.
Differences Between MapLoader and MapStore
The MapStore
interface extends the MapLoader
interface. Therefore, all methods and configuration parameters of the MapLoader
interface are also available on the MapStore
interface.
If you only want to load data from external systems in a map, use the MapLoader
interface. If you also want to save map entries to an external system, use the MapStore
interface.
Interface | Description |
---|---|
|
The methods on the |
|
The methods on the |
Connecting to an External Data Store
To connect to an external data store, you must configure a connection to it, using either a third-party library or a JDBC driver in the init()
method of the MapLoaderlifeCycleSupport
implementation.
The data store that you choose must be a centralized system that is accessible to all Hazelcast members. Persistence to a local file system is not supported.
The init()
method initializes the MapStore. Hazelcast calls this method when the map is first created on a Hazelcast member. The MapStore can initialize the required resources such as reading a configuration file, creating a database connection, or accessing a Hazelcast instance.
By default, MapStores are loaded as soon as the first cluster member starts. If you want the cluster to wait until a certain number of members are available, you can use the hazelcast.initial.min.cluster.size system property. For example, if you set this value to 3 , the MapStore is not loaded until three cluster members are running.
|
The destroy()
method is called during the graceful shutdown of a Hazelcast member. You can override this method to clean up the resources held by the MapStore, such as closing the database connections.
See an example of this interface.
You can declaratively specify the database properties in your configuration file and then implement the init()
method to receive those properties. For example, you can define the database URL and name, using the properties
configuration element. The following is a configuration example for MongoDB:
<hazelcast>
...
<map name="supplements">
<map-store enabled="true" initial-mode="LAZY">
<class-name>com.hazelcast.loader.YourMapStoreImplementation</class-name>
<properties>
<property name="mongo.url">mongodb://localhost:27017</property>
<property name="mongo.db">mydb</property>
<property name="mongo.collection">supplements</property>
</properties>
</map-store>
</map>
...
</hazelcast>
hazelcast:
map:
supplements:
map-store:
enabled: true
initial-mode: LAZY
class-name: com.hazelcast.loader.YourMapStoreImplementation
properties:
mongo_url: mongodb://localhost:27017
mongo.db: mydb
mango.collection: supplements
public class YourMapStoreImplementation implements MapStore<String, Supplement>, MapLoaderLifecycleSupport {
private MongoClient mongoClient;
private MongoCollection collection;
public YourMapStoreImplementation() {
}
@Override
public void init(HazelcastInstance hazelcastInstance, Properties properties, String mapName) {
String mongoUrl = (String) properties.get("mongo.url");
String dbName = (String) properties.get("mongo.db");
String collectionName = (String) properties.get("mongo.collection");
this.mongoClient = new MongoClient(new MongoClientURI(mongoUrl));
this.collection = mongoClient.getDatabase(dbName).getCollection(collectionName);
}
See a full example.
Populating a New Map
The first time you create a map that is configured with a MapStore, the map is empty. As a result, the Hazelcast cluster must request data from the external data store. Depending on the amount of data Hazelcast needs to request, the map may take a long time to be populated with entries from the data store. The fastest way to retrieve entries from the data store is to use the MapLoader.loadAllKeys()
method. When you implement this method, each Hazelcast member connects to the database in parallel to request entries in its owned partitions.
The MapLoader.loadAllKeys()
method can return all, some, or none of the keys. For example, you can specify a range of keys to be loaded, then rely on read-through to load the remaining keys on demand. Or, you can return a null
value so that no data is loaded from the data store.
If the number of keys to load is large, it is more efficient to load them incrementally rather than loading them all at once. To support
incremental loading, the MapLoader.loadAllKeys()
method returns an Iterable
which can be lazily populated with the results of a database query. Hazelcast iterates over the returned data and, while doing so, sends the keys to their respective owner members. The iterator that was returned from the MapLoader.loadAllKeys()
method may also implement the Closeable
interface, in which case the iterator is closed when the iteration is over. This is intended for releasing resources such as closing a JDBC result set.
Hazelcast clusters populate new maps following this process:
-
Initialization starts, depending on the value of the
initial-mode
configuration. If theinitial-mode
configuration is set toEAGER
, initialization starts on all partitions as soon as the map is created. If theinitial-mode
property is set toLAZY
, data is loaded when an operation tries to read an entry from the map. -
That member distributes keys to all other members in batches.
-
Each member loads all values of its owned keys by calling
MapLoader.loadAll(keys)
. -
Each member puts its owned entries into the map by calling
IMap.putTransient(key,value)
.
Using a MapStore with Write-Behind
If you configure your MapStore to use the write-behind caching pattern, you can do the following:
-
Apply batch updates to the data store to reduce requests to the data store.
-
Edit entries in the map without writing those changes back to the data store.
Batch Updates
Batch operations are only allowed in write-behind mode. If your application performs a batch update or batch delete from a map, Hazelcast calls the MapStore.storeAll(map)
and MapStore.deleteAll(collection)
methods defined in your MapStore
class to make all writes in a single call.
Editing Entries in a Map Without Writing Back to the Data Store
You can edit an entry in a map after it’s already been written to the external data store. For example, you may want to get an autogenerated ID from the data store then edit the in-memory entry for local use without writing the change back to the external store.
To edit a map entry without writing it back to the data store, implement the PostProcessingMapStore
interface. This interface triggers an extra step of serialization, so use it only when needed.
Here is an example of post processing:
class ProcessingStore implements MapStore<Integer, Employee>, PostProcessingMapStore {
@Override
public void store( Integer key, Employee employee ) {
EmployeeId id = saveEmployee();
employee.setId( id.getId() );
}
}
If you edit entries in combination with Entry Processors, edited values are be saved to in-memory backups. |
Forcing All Keys To Be Loaded
The MapLoader.loadAll()
method loads some or all keys into a data store in order to optimize multiple load operations. This method has two signatures. One signature loads the given keys and the other loads all keys. See the example code below.
final int numberOfEntriesToAdd = 1000;
final String mapName = LoadAll.class.getCanonicalName();
final Config config = createNewConfig(mapName);
final HazelcastInstance node = Hazelcast.newHazelcastInstance(config);
final IMap<Integer, Integer> map = node.getMap(mapName);
populateMap(map, numberOfEntriesToAdd);
System.out.printf("# Map store has %d elements\n", numberOfEntriesToAdd);
map.evictAll();
System.out.printf("# After evictAll map size\t: %d\n", map.size());
map.loadAll(true);
System.out.printf("# After loadAll map size\t: %d\n", map.size());
Setting Expiration Times on Loaded and Stored Data Entries
Entries loaded by MapLoader
implementations do not have a set time-to-live property. Therefore, they live until evicted or explicitly removed. To enforce expiration times on the entries, you can use the EntryLoader
and EntryStore
interfaces.
These interfaces extend the MapLoader and MapStore interfaces. Therefore, all methods and configuration parameters of the MapLoader and
MapStore implementations are also available on the EntryLoader and EntryStore implementations.
|
EntryLoader
allows you to set time-to-live values per key before handing the values to Hazelcast. Therefore, you can store and load
key-specific time-to-live values in the external storage.
Similar to EntryLoader
, in order to store custom expiration times associated with the entries, you may use EntryStore
. EntryStore
allows you to retrieve associated expiration date for each entry. The expiration date is an offset from an epoch in milliseconds. Epoch is January 1, 1970 UTC which is used by System.currentTimeMillis()
.
Although the expiration date is expressed in milliseconds, expiration dates are rounded to the nearest lower whole second. |
The following example shows you how to implement the EntryStore
interface.
public class PersonEntryStore implements EntryStore<Long, Person> {
private final Connection con;
private final PreparedStatement allKeysStatement;
public PersonEntryStore() {
try {
con = DriverManager.getConnection("jdbc:hsqldb:mydatabase", "SA", "");
con.createStatement().executeUpdate(
"create table if not exists person (id bigint not null, name varchar(45), expiration-date bigint, primary key (id))");
allKeysStatement = con.prepareStatement("select id from person");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public synchronized void delete(Long key) {
System.out.println("Delete:" + key);
try {
con.createStatement().executeUpdate(
format("delete from person where id = %s", key));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public synchronized void store(Long key, MetadataAwareValue<Person> value) {
try {
con.createStatement().executeUpdate(
format("insert into person values(%s,'%s', %d)", key, value.getValue().getName(), value.getExpirationTime()));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public void storeAll(Map<Long, MetadataAwareValue<Person>> map) {
for (Map.Entry<Long, MetadataAwareValue<Person>> entry : map.entrySet()) {
store(entry.getKey(), entry.getValue());
}
}
@Override
public synchronized void deleteAll(Collection<Long> keys) {
for (Long key : keys) {
delete(key);
}
}
@Override
public synchronized MetadataAwareValue<Person> load(Long key) {
try {
ResultSet resultSet = con.createStatement().executeQuery(
format("select name,expiration-date from person where id =%s", key));
try {
if (!resultSet.next()) {
return null;
}
String name = resultSet.getString(1);
Long expirationDate = resultSet.getLong(2);
return new MetadataAwareValue<>(new Person(key, name), expirationDate);
} finally {
resultSet.close();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public synchronized Map<Long, MetadataAwareValue<Person>> loadAll(Collection<Long> keys) {
Map<Long, MetadataAwareValue<Person>> result = new HashMap<>();
for (Long key : keys) {
result.put(key, load(key));
}
return result;
}
public Iterable<Long> loadAllKeys() {
return new StatementIterable<Long>(allKeysStatement);
}
}
Full Example of a MapStore
The following example shows you a complete MapStore
implementation.
public class PersonMapStore implements MapStore<Long, Person> {
private final Connection con;
private final PreparedStatement allKeysStatement;
public PersonMapStore() {
try {
con = DriverManager.getConnection("jdbc:hsqldb:mydatabase", "SA", "");
con.createStatement().executeUpdate(
"create table if not exists person (id bigint not null, name varchar(45), primary key (id))");
allKeysStatement = con.prepareStatement("select id from person");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public synchronized void delete(Long key) {
System.out.println("Delete:" + key);
try {
con.createStatement().executeUpdate(
format("delete from person where id = %s", key));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public synchronized void store(Long key, Person value) {
try {
con.createStatement().executeUpdate(
format("insert into person values(%s,'%s')", key, value.getName()));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public synchronized void storeAll(Map<Long, Person> map) {
for (Map.Entry<Long, Person> entry : map.entrySet()) {
store(entry.getKey(), entry.getValue());
}
}
public synchronized void deleteAll(Collection<Long> keys) {
for (Long key : keys) {
delete(key);
}
}
public synchronized Person load(Long key) {
try {
ResultSet resultSet = con.createStatement().executeQuery(
format("select name from person where id =%s", key));
try {
if (!resultSet.next()) {
return null;
}
String name = resultSet.getString(1);
return new Person(key, name);
} finally {
resultSet.close();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public synchronized Map<Long, Person> loadAll(Collection<Long> keys) {
Map<Long, Person> result = new HashMap<Long, Person>();
for (Long key : keys) {
result.put(key, load(key));
}
return result;
}
public Iterable<Long> loadAllKeys() {
return new StatementIterable<Long>(allKeysStatement);
}
}
If you are using Hazelcast in client/server mode, you must add the MapStore and any dependencies to the classpath of your members.
If you use multiple threads to access shared state in a MapStore
implementation, you need to make sure that the implementation is thread safe. Each member receives an instance of the MapStore
implementation, which means that multiple threads can access it at the same time.
Related Resources
To monitor MapStores for each loaded entry, use the EntryLoadedListener
interface. See the Listening for Map Events section to learn how you can catch entry-based events.
For a tutorial, see Use Cloud Standard as a Write-Through Cache with MongoDB Atlas.
Next Steps
If you use Hazelcast in client/server mode, you must add your MapStore and any other dependencies to the classpaths of your members.
After you’ve created your MapStore
implementation and it’s on the classpath of your members, you need to configure one or more maps to use it. By doing so, you plug the MapStore into the lifecycle of the map. For details about configuring a MapStore, see the configuration guide.