Implementing a Custom MapStore

You can use the Java MapStore and MapLoader interfaces to implement a custom MapStore with your own logic, such as for database connections, loading data from an external system, and writing data back to the external system.

Differences Between MapLoader and MapStore

The MapStore interface extends the MapLoader interface. Therefore, all methods and configuration parameters of the MapLoader are also available on the MapStore interface.

If you only want to load data from external systems into a map, use the MapLoader interface. If you also want to save map entries to an external system, use the MapStore interface.

Interface Description

MapLoader

The methods on the MapLoader interface are invoked when the application requests a value from the map. If the requested value does not exist in memory, the MapLoader.load() method tries to load it from the external system. Once loaded into memory, the map entry remains until it is changed, moved, or evicted.

MapStore

Additionally, the methods on the MapStore interface replicate updates made to a map to the external system. This replication can be implemented as a blocking (write-through) or non-blocking (write-behind) operation.

Connecting to an External System

To connect to an external system, you must configure a connection to it, using either a third-party library or a JDBC driver in the init() method of the MapLoaderlifeCycleSupport implementation.

The external system that you choose must be a centralized system that is accessible to all Hazelcast members. Persistence to a local file system is not supported.

The init() method initializes the MapStore. Hazelcast calls this method when the map is first created on a Hazelcast member. The MapStore can initialize the required resources such as reading a configuration file, creating a database connection, or accessing a Hazelcast instance.

By default, MapStores are loaded as soon as the first cluster member starts. If you want the cluster to wait until a certain number of members are available, you can use the hazelcast.initial.min.cluster.size system property. For example, if you set this value to 3, the MapStore is not loaded until three cluster members are running.

The destroy() method is called during the graceful shutdown of a Hazelcast member. You can override this method to clean up the resources held by the MapStore, such as closing the database connections.

You can declaratively specify the database properties in your configuration file and then implement the init() method to receive those properties. For example, you can define the database URL and name, using the properties configuration element. The following is a configuration example for MongoDB:

  • XML

  • YAML

<hazelcast>
    ...
    <map name="supplements">
        <map-store enabled="true" initial-mode="LAZY">
            <class-name>com.hazelcast.loader.YourMapStoreImplementation</class-name>
            <properties>
                <property name="mongo.url">mongodb://localhost:27017</property>
                <property name="mongo.db">mydb</property>
                <property name="mongo.collection">supplements</property>
            </properties>
        </map-store>
    </map>
    ...
</hazelcast>
hazelcast:
  map:
    supplements:
      map-store:
        enabled: true
        initial-mode: LAZY
        class-name: com.hazelcast.loader.YourMapStoreImplementation
        properties:
          mongo_url: mongodb://localhost:27017
          mongo.db: mydb
          mango.collection: supplements
public class YourMapStoreImplementation implements MapStore<String, Supplement>, MapLoaderLifecycleSupport {

    private MongoClient mongoClient;
    private MongoCollection collection;

    public YourMapStoreImplementation() {
    }

    @Override
    public void init(HazelcastInstance hazelcastInstance, Properties properties, String mapName) {
        String mongoUrl = (String) properties.get("mongo.url");
        String dbName = (String) properties.get("mongo.db");
        String collectionName = (String) properties.get("mongo.collection");
        this.mongoClient = new MongoClient(new MongoClientURI(mongoUrl));
        this.collection = mongoClient.getDatabase(dbName).getCollection(collectionName);
    }

See a full example.

Populating a New Map

The first time you create a map that is configured with a MapStore, the map is empty. As a result, the Hazelcast cluster must request data from the external system. Depending on the amount of data Hazelcast needs to request, the map may take a long time to be populated with entries from the external system. The fastest way to retrieve entries from the external system is to use the MapLoader.loadAllKeys() method. When you implement this method, each Hazelcast member connects to the database in parallel to request entries in its owned partitions.

The MapLoader.loadAllKeys() method can return all, some, or none of the keys. For example, you can specify a range of keys to be loaded, then rely on read-through to load the remaining keys on demand. Or, you can return a null value so that no data is loaded from the external system.

If the number of keys to load is large, it is more efficient to load them incrementally rather than loading them all at once. To support incremental loading, the MapLoader.loadAllKeys() method returns an Iterable which can be lazily populated with the results of a database query. Hazelcast iterates over the returned data and, while doing so, sends the keys to their respective owner members. The iterator that was returned from the MapLoader.loadAllKeys() method may also implement the Closeable interface, in which case the iterator is closed when the iteration is over. This is intended for releasing resources such as closing a JDBC result set.

Hazelcast clusters populate new maps following this process:

  1. Initialization starts, depending on the value of the initial-mode configuration. If the initial-mode configuration is set to EAGER, initialization starts on all partitions as soon as the map is created. If the initial-mode property is set to LAZY, data is loaded when an operation tries to read an entry from the map.

  2. That member distributes keys to all other members in batches.

  3. Each member loads all values of its owned keys by calling MapLoader.loadAll(keys).

  4. Each member puts its owned entries into the map by calling IMap.putTransient(key,value).

Using a MapStore with Write-Behind

If you configure your MapStore to use the write-behind caching pattern, you can do the following:

  • Apply batch updates to the external system to reduce requests to the data store.

  • Edit entries in the map without writing those changes back to the external system.

Batch Updates

Batch operations are only allowed in write-behind pattern. In this mode, whether your application performs a batch update/delete operation in a map or not, Hazelcast calls the MapStore.storeAll(map) and MapStore.deleteAll(collection) methods defined in your MapStore class to make all operations in a single call.

When your application issues batch (for example putAll()) or non-batch operations (for example put()), all the updates in the write-behind queue are batched and batch methods of the MapStore with write-behind caching pattern are invoked; that is storeAll() and deleteAll().

For example, if there are 50 put and 20 delete operations in the write-behind queue and it is time to flush the queue, batching results in a single invocation of MapStore#storeAll() for 50 put operations and a single invocation of MapStore#deleteAll() for 20 delete operations.

Editing Entries in a Map Without Writing Back to the External System

You can edit an entry in a map after it’s already been written to your external system. For example, you may want to get an autogenerated ID from the data store then edit the in-memory entry for local use without writing the change back to the external system.

To edit a map entry without writing it back to the external system, implement the PostProcessingMapStore interface. This interface triggers an extra step of serialization, so use it only when needed.

Here is an example of post-processing:

class ProcessingStore implements MapStore<Integer, Employee>, PostProcessingMapStore {
    @Override
    public void store( Integer key, Employee employee ) {
        EmployeeId id = saveEmployee();
        employee.setId( id.getId() );
    }
}
If you edit entries in combination with Entry Processors, edited values are be saved to in-memory backups.

Forcing All Keys To Be Loaded

The MapLoader.loadAll() method loads some or all keys into a data store in order to optimize multiple load operations. This method has two signatures. One signature loads the given keys and the other loads all keys. See the example code below.

final int numberOfEntriesToAdd = 1000;
final String mapName = LoadAll.class.getCanonicalName();
final Config config = createNewConfig(mapName);
final HazelcastInstance node = Hazelcast.newHazelcastInstance(config);
final IMap<Integer, Integer> map = node.getMap(mapName);

populateMap(map, numberOfEntriesToAdd);
System.out.printf("# Map store has %d elements\n", numberOfEntriesToAdd);

map.evictAll();
System.out.printf("# After evictAll map size\t: %d\n", map.size());

map.loadAll(true);
System.out.printf("# After loadAll map size\t: %d\n", map.size());

Setting Expiration Times on Loaded and Stored Data Entries

Entries loaded by MapLoader implementations do not have a set time-to-live property. Therefore, they live until evicted or explicitly removed. To enforce expiration times on the entries, you can use the EntryLoader and EntryStore interfaces.

These interfaces extend the MapLoader and MapStore interfaces. Therefore, all methods and configuration parameters of the MapLoader and MapStore implementations are also available on the EntryLoader and EntryStore implementations.

EntryLoader allows you to set time-to-live values per key before handing the values to Hazelcast. Therefore, you can store and load key-specific time-to-live values in the external system.

Similar to EntryLoader, in order to store custom expiration times associated with the entries, you may use EntryStore. EntryStore allows you to retrieve associated expiration date for each entry. The expiration date is an offset from an epoch in milliseconds. Epoch is January 1, 1970 UTC which is used by System.currentTimeMillis().

Although the expiration date is expressed in milliseconds, expiration dates are rounded to the nearest lower whole second.

The following example shows you how to implement the EntryStore interface.

public class PersonEntryStore implements EntryStore<Long, Person> {

    private final Connection con;
    private final PreparedStatement allKeysStatement;

    public PersonEntryStore() {
        try {
            con = DriverManager.getConnection("jdbc:hsqldb:mydatabase", "SA", "");
            con.createStatement().executeUpdate(
                    "create table if not exists person (id bigint not null, name varchar(45), expiration-date bigint, primary key (id))");
            allKeysStatement = con.prepareStatement("select id from person");
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public synchronized void delete(Long key) {
        System.out.println("Delete:" + key);
        try {
            con.createStatement().executeUpdate(
                    format("delete from person where id = %s", key));
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public synchronized void store(Long key, MetadataAwareValue<Person> value) {
        try {
            con.createStatement().executeUpdate(
                    format("insert into person values(%s,'%s', %d)", key, value.getValue().getName(), value.getExpirationTime()));
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void storeAll(Map<Long, MetadataAwareValue<Person>> map) {
        for (Map.Entry<Long, MetadataAwareValue<Person>> entry : map.entrySet()) {
            store(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public synchronized void deleteAll(Collection<Long> keys) {
        for (Long key : keys) {
            delete(key);
        }
    }

    @Override
    public synchronized MetadataAwareValue<Person> load(Long key) {
        try {
            ResultSet resultSet = con.createStatement().executeQuery(
                    format("select name,expiration-date from person where id =%s", key));
            try {
                if (!resultSet.next()) {
                    return null;
                }
                String name = resultSet.getString(1);
                Long expirationDate = resultSet.getLong(2);
                return new MetadataAwareValue<>(new Person(key, name), expirationDate);
            } finally {
                resultSet.close();
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public synchronized Map<Long, MetadataAwareValue<Person>> loadAll(Collection<Long> keys) {
        Map<Long, MetadataAwareValue<Person>> result = new HashMap<>();
        for (Long key : keys) {
            result.put(key, load(key));
        }
        return result;
    }

    public Iterable<Long> loadAllKeys() {
        return new StatementIterable<Long>(allKeysStatement);
    }
}

Full Example of a MapStore

The following example shows you a complete MapStore implementation.

public class PersonMapStore implements MapStore<Long, Person> {

    private final Connection con;
    private final PreparedStatement allKeysStatement;

    public PersonMapStore() {
        try {
            con = DriverManager.getConnection("jdbc:hsqldb:mydatabase", "SA", "");
            con.createStatement().executeUpdate(
                    "create table if not exists person (id bigint not null, name varchar(45), primary key (id))");
            allKeysStatement = con.prepareStatement("select id from person");
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized void delete(Long key) {
        System.out.println("Delete:" + key);
        try {
            con.createStatement().executeUpdate(
                    format("delete from person where id = %s", key));
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized void store(Long key, Person value) {
        try {
            con.createStatement().executeUpdate(
                    format("insert into person values(%s,'%s')", key, value.getName()));
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized void storeAll(Map<Long, Person> map) {
        for (Map.Entry<Long, Person> entry : map.entrySet()) {
            store(entry.getKey(), entry.getValue());
        }
    }

    public synchronized void deleteAll(Collection<Long> keys) {
        for (Long key : keys) {
            delete(key);
        }
    }

    public synchronized Person load(Long key) {
        try {
            ResultSet resultSet = con.createStatement().executeQuery(
                    format("select name from person where id =%s", key));
            try {
                if (!resultSet.next()) {
                    return null;
                }
                String name = resultSet.getString(1);
                return new Person(key, name);
            } finally {
                resultSet.close();
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized Map<Long, Person> loadAll(Collection<Long> keys) {
        Map<Long, Person> result = new HashMap<Long, Person>();
        for (Long key : keys) {
            result.put(key, load(key));
        }
        return result;
    }

    public Iterable<Long> loadAllKeys() {
        return new StatementIterable<Long>(allKeysStatement);
    }
}

If you are using Hazelcast in client/server mode, you must add the MapStore and any dependencies to the classpath of your members.

If you use multiple threads to access shared state in a MapStore implementation, you need to make sure that the implementation is thread safe. Each member receives an instance of the MapStore implementation, which means that multiple threads can access it at the same time.

To monitor MapStores for each loaded entry, use the EntryLoadedListener interface. See the Listening for Map Events section to learn how you can catch entry-based events.

Next Steps

If you use Hazelcast in client/server mode, you must add your MapStore and any other dependencies to the classpaths of your members.

After you’ve created your MapStore implementation and it’s on the classpath of your members, you need to configure one or more maps to use it. By doing so, you plug the MapStore into the lifecycle of the map. For details about configuring a MapStore, see the configuration guide.