MongoDB is a well known document-oriented NoSQL database. Hazelcast is able to connect to MongoDB to read and write data.
MongoDB source works in two modes: batch or streaming.
The batch mode means that data will be read only once. Simple batch usage:
BatchSource<Document> batchSource = MongoSources.batch( dataConnectionRef("mongoDb"), "myDatabase", "myCollection", new Document("age", new Document("$gt", 10)), new Document("age", 1) ); Pipeline p = Pipeline.create(); BatchStage<Document> srcStage = p.readFrom(batchSource);
In this simple method you are restricted to provide the client supplier/data connection reference, database and collection name, projection and filter in a single method invocation. If you want some advanced features or just like more builder-like syntax, there is another option, which uses builder pattern to create the source. This option allows you to specify e.g. to which class object read from Mongo will be parsed (by default it’s
org.bson.Document), specify sorting or the behaviour on reading from not-yet-created collections.
BatchSource<MyDTO> batchSource = MongoSources.batch(dataConnectionRef("mongo")) (1) .database("myDatabase") (2) .collection("myCollection", MyDTO.class) (3) .filter(Filters.gt("age", 10), (4) .projection(Projections.include("age")) (5) ); Pipeline p = Pipeline.create(); BatchStage<Document> srcStage = p.readFrom(batchSource);
|1||Reference to the Mongo Data Connection or
|2||Name of the database from which items will be read|
|3||Name of the collection from which items will be read and type to which the elements will be
parsed. Second argument is optional and if not provided, source will emit objects of type
|4||(optional) Filters that will be used when reading the data. Is it good for performance to include here every possible filter user needs, instead of following
|5||(optional) Projection specification, so infomation which properties will be read. In case of large documents having projection may significantly increase performance.|
The streaming source creation is similar to the batch one; use the
stream method instead of
batch. Additionally, it’s recommended to use the
startAtOperationTime function when creating the streaming source with a builder.
Example stream source creation:
StreamSource<Document> streamSource = MongoSources.stream("batch-source", () -> MongoClients.create("mongodb://127.0.0.1:27017")) .database("myDatabase") .collection("myCollection") .filter(Filters.gt("fullDocument.age", 10)), (1) .projection(Projections.include("fullDocument.age")) (1) .startAtOperationTime(System.currentTimeMillis()) (2) ); Pipeline p = Pipeline.create(); StreamStage<Document> srcStage = p.readFrom(streamSource);
|1||Same as in batch, except the
|2||Place in the Mongo’s oplog from which the changes will be read.|
_id range would not be sufficient.
1 and read from one instance only (so the total parallelism will be 1).
MongoDB sink is a simple yet powerful way to write documents to MongoDB.
Sink<Document> mongoSink = MongoSinks.mongodb( "mongodb://127.0.0.1:27017", "myDatabase", "myCollection" ); Pipeline p = Pipeline.create(); (...) someStage.writeTo(mongoSink);
You can also use the sink builder syntax:
Sink<Document> mongoSink = MongoSinks.builder( Document.class, (1) () -> MongoClients.create("mongodb://127.0.0.1:27017") (2) ) .into("myDatabase", "myCollection") (3) .identifyDocumentBy("_id", doc -> doc.get("_id")) (4) .build() ); Pipeline p = Pipeline.create(); (...) someStage.writeTo(mongoSink);
|1||Class of the document, that will be inserted. Can be
|2||Supplier that creates the Mongo client or a
|3||Destination database and collection to which items will be saved. In this case it’s a constant value. You can also use the overloaded method, that have 2 functions as arguments to specify the target database and collection for every item separately (so in theory, every item can land in a different collection).|
|4||Specification of how the documents will be identified by the sink. This is required to distinguish two documents and know if the connector should perform insert or update. By default, if the class of the inserted document is
Some other builder methods worth mentioning:
commitRetryStrategy- how often a commit should be retried in case of transient errors. Used only with exactly once processing guarantee. Note that commit interval depends on the snapshot interval (more on this in Fault Tolerance section below), so commit interval in the Retry Strategy is a "minimum time", not exact value.
transactionOptions- specifies Mongo’s transaction options - read concern, write concern, read preference, etc. Used only with exactly once processing guarantee.
REPLACE- which operation will be used to put documents into Mongo. Default value is
REPLACE(but inserts will be still performed if field got from
throwOnNonExisting- if true, connector will throw an exception if the database or collection does not exist prior to job execution.
withCustomReplaceOptions- allows user to customize replace operations, like adding validation bypass or disabling upserts.
By default sink puts the documents in parallel on all nodes.
The Mongo sink supports exactly-once guarantee. It uses MongoDB’s transactions if such guarantee is requested by the user. In case of transient errors, the transaction will be automatically retried as configured by the
commitRetryStrategy option. The documents are committed with the last snapshot, which increases latency.
The commit interval is determined by the snapshot interval and retry strategy. Retry strategy defines minimum interval between commits (if it’s smaller than snapshot interval) and how many times transaction will be retried before an exception will be thrown.
Inserted documents will be visible to others after snapshot is made and transaction is committed.