MongoDB Connector

MongoDB is a well known document-oriented NoSQL database. Hazelcast is able to connect to MongoDB to read and write data.

Requirements

  • MongoDB should be on version at least 4.4.

  • If you cannot connect from more than one processor, e.g., if you are using Mongo Atlas offering with 1 client only, you have to ensure you use forceReadTotalParallelismOne option. This is due to the fact, that Hazelcast uses $function aggregate behind the scenes to parallelize the work and $function may not be available on some environments like Mongo Atlas Serverless.

  • Streaming requires oplog to be enabled. This also means that e.g. Mongo Atlas Serverless won’t work with streaming.

MongoDB Source

MongoDB source works in two modes: batch or streaming.

Batch

The batch mode means that data will be read only once. Simple batch usage:

BatchSource<Document> batchSource =
MongoSources.batch(
        dataConnectionRef("mongoDb"),
        "myDatabase",
        "myCollection",
        new Document("age", new Document("$gt", 10)),
        new Document("age", 1)
);
Pipeline p = Pipeline.create();
BatchStage<Document> srcStage = p.readFrom(batchSource);

In this simple method you are restricted to provide the client supplier/data connection reference, database and collection name, projection and filter in a single method invocation. If you want some advanced features or just like more builder-like syntax, there is another option, which uses builder pattern to create the source. This option allows you to specify e.g. to which class object read from Mongo will be parsed (by default it’s org.bson.Document), specify sorting or the behaviour on reading from not-yet-created collections.

BatchSource<MyDTO> batchSource =
    MongoSources.batch(dataConnectionRef("mongo")) (1)
            .database("myDatabase") (2)
            .collection("myCollection", MyDTO.class) (3)
            .filter(Filters.gt("age", 10), (4)
            .projection(Projections.include("age")) (5)
    );
Pipeline p = Pipeline.create();
BatchStage<Document> srcStage = p.readFrom(batchSource);
1 Reference to the Mongo Data Connection or MongoClient supplier
2 Name of the database from which items will be read
3 Name of the collection from which items will be read and type to which the elements will be parsed. Second argument is optional and if not provided, source will emit objects of type org.bson.Document
4 (optional) Filters that will be used when reading the data. Is it good for performance to include here every possible filter user needs, instead of following readFrom with immediate filter operation.
5 (optional) Projection specification, so information which properties will be read. In case of large documents having projection may significantly increase performance.

Stream

The streaming source creation is similar to the batch one; use the stream method instead of batch. Additionally, it’s recommended to use the startAtOperationTime function when creating the streaming source with a builder. Example stream source creation:

StreamSource<Document> streamSource =
    MongoSources.stream("batch-source", () -> MongoClients.create("mongodb://127.0.0.1:27017"))
        .database("myDatabase")
        .collection("myCollection")
        .filter(Filters.gt("fullDocument.age", 10)), (1)
        .projection(Projections.include("fullDocument.age")) (1)
        .startAtOperationTime(System.currentTimeMillis()) (2)
     );
Pipeline p = Pipeline.create();
StreamStage<Document> srcStage = p.readFrom(streamSource);
1 Same as in batch, except the fullDocument. prefix. This prefix is due to a fact, that you are no longer working on Document objects, but on the ChangeStreamDocument objects - special MongoDB wrappers for events that contain change metadata together with modified document in the fullDocument property.
2 Place in the Mongo’s oplog from which the changes will be read.

Note on Parallelization

By default, the connector adds 3 additional aggregation steps, one of which uses the server side JavaScript to calculate if the given document belongs to the given partition. This allows parallel reading from multiple nodes also in streaming mode, where simple division by _id range would not be sufficient.

For this, you need to enable the server-side JavaScript. If you cannot, you should set the parallelism to 1 and read from one instance only (so the total parallelism will be 1).

MongoDB Sink

MongoDB sink is a simple yet powerful way to write documents to MongoDB.

Simple example:

Sink<Document> mongoSink =
         MongoSinks.mongodb(
                 "mongodb://127.0.0.1:27017",
                 "myDatabase",
                 "myCollection"
         );

Pipeline p = Pipeline.create();
(...)
someStage.writeTo(mongoSink);

You can also use the sink builder syntax:

Sink<Document> mongoSink =
         MongoSinks.builder(
                     Document.class, (1)
                     () -> MongoClients.create("mongodb://127.0.0.1:27017") (2)
                 )
                 .into("myDatabase", "myCollection") (3)
                 .identifyDocumentBy("_id", doc -> doc.get("_id")) (4)
                 .build()
         );

Pipeline p = Pipeline.create();
(...)
someStage.writeTo(mongoSink);
1 Class of the document, that will be inserted. Can be Document, BsonDocument or any user defined POJO.
2 Supplier that creates the Mongo client or a dataConnectionRef.
3 Destination database and collection to which items will be saved. In this case it’s a constant value. You can also use the overloaded method, that have 2 functions as arguments to specify the target database and collection for every item separately (so in theory, every item can land in a different collection).
4 Specification of how the documents will be identified by the sink. This is required to distinguish two documents and know if the connector should perform insert or update. By default, if the class of the inserted document is org.bson.Document, the connector will distinguish two documents by their _id fields.

Some other builder methods worth mentioning:

  1. commitRetryStrategy - how often a commit should be retried in case of transient errors. Used only with exactly once processing guarantee. Note that commit interval depends on the snapshot interval (more on this in Fault Tolerance section below), so commit interval in the Retry Strategy is a "minimum time", not exact value.

  2. transactionOptions - specifies Mongo’s transaction options - read concern, write concern, read preference, etc. Used only with exactly once processing guarantee.

  3. writeMode - INSERT_ONLY, UPDATE_ONLY, UPSERT or REPLACE - which operation will be used to put documents into Mongo. Default value is REPLACE (but inserts will be still performed if field got from identifyDocumentBy returns null).

  4. throwOnNonExisting - if true, connector will throw an exception if the database or collection does not exist prior to job execution.

  5. withCustomReplaceOptions - allows user to customize replace operations, like adding validation bypass or disabling upserts.

By default sink puts the documents in parallel on all nodes.

Fault Tolerance

The Mongo sink supports exactly-once guarantee. It uses MongoDB’s transactions if such guarantee is requested by the user. In case of transient errors, the transaction will be automatically retried as configured by the commitRetryStrategy option. The documents are committed with the last snapshot, which increases latency.

The commit interval is determined by the snapshot interval and retry strategy. Retry strategy defines minimum interval between commits (if it’s smaller than snapshot interval) and how many times transaction will be retried before an exception will be thrown.

Inserted documents will be visible to others after snapshot is made and transaction is committed.