Describe a Mongo database

Flow can read data from a Mongo collection to fetch data as part of a query, as well as write data back to Mongo.

Define a connection to your Mongo database

Mongo connections are stored in your connections.conf config file, under the mongo element.

The connection specifies how to connect to a Mongo database.

mongo {
   usersMongo {
      connectionName=usersMongo
      connectionParameters {
        dbName = "sample_mflix"
        connectionString = "mongodb+srv://{short-product-name}:PASSWORD@{short-product-name}.xxxx.mongodb.net/?retryWrites=true&w=majority&appName={short-product-name}"
      }
   }
}

The following configuration options are mandatory under the connectionParameters:

Config option Purpose

dbName

Name of the Mongo Database.

connectionString

A Valid Mongo Connection String (see Mongo DB Connection Strings for details.)

Define a collection mapping

Collections are exposed to Flow using the annotation @flow.mongo.Collection on a model.

Fields names in the model are expected to align with field names from the collection.

Here’s an example:

import flow.mongo.Collection

type FirstName inherits String
type Password inherits String
type Email inherits String

@Collection(connection = "usersMongo", collection = "users")
model User {
      name : FirstName
      password : Password
      email: Email
}

The @Collection annotation contains the following parameters:

Parameter Description

connection

The name of a connection, as defined in your connections configuration file

collection

The name of the collection

Map the ObjectId

Use an @Id annotation to define the column that represents the Mongo ObjectId.

Here’s an example:

import flow.mongo.Collection

type FirstName inherits String
type Password inherits String
type Email inherits String
type MongoObjectId inherits String

@Collection(connection = "usersMongo", collection = "users")
 model UserWithObjectId {
     @Id
     objectId: MongoObjectId
     name : FirstName
     password : Password
     email: Email
}

Query collections

To expose a Mongo database as a source for queries, the database must have a service and table operation exposed for a collection.

// 1: Add the required imports
import flow.mongo.MongoService

// 2: Annotate the service as a `MongoService`.
@MongoService( connectionName = "usersMongo" )
service MongoUsersService {
   // 3: expose a table operation for a collection model.
   table user : User[]
   table mongoUsers: UserWithObjectId[]
}

The @MongoService annotation contains the following parameters:

Parameter Description

connection

The name of a connection, as defined in your connections configuration file.

Sample queries

Fetch everything from a collection:

find { User[] }

Fetch values by criteria:

find { User[]( FirstName == "Harry" ) }
find { User[]( FirstName == "Harry" || FirstName == "Joe" ) }

Write data to a collection

To expose a database collection for writes, you need to provide a write operation in a service.

Here’s a complete example schema with corresponding write operations:

The question mark in 'MongoObjectId?' denotes that the value is nullable.
type FlightCode inherits String
type DepartureTime inherits Instant
type DepartureAirport inherits String
type ArrivalAirport inherits String
type MongoObjectId inherits String

type AirlineCode inherits String
type AirlineName inherits String
type StarAllianceMember inherits Boolean

model Airline {
   code: AirlineCode
   name: AirlineName
   starAlliance: StarAllianceMember
}

@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfo {
   code: FlightCode
   depTime : DepartureTime
   arrival: ArrivalAirport
   airline: Airline
}

@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfoWithObjectId {
   @Id
   objectId: MongoObjectId?
   code: FlightCode
   departure: DepartureAirport
   arrival: ArrivalAirport
   airline: Airline
}

@MongoService( connection = "flightsMongo" )
service FlightsDb {
   table FlightInfo : FlightInfo[]
   table mongoFlights: FlightInfoWithObjectId[]

   // This is effectively Insert as the FlightInfo does not have @Id annotation.
   @UpsertOperation
   write operation insertFlight(FlightInfo):FlightInfo

   // If objectId field is populated, this will update the matching item in the collection.
   // Otherwise it will insert that provided  FlightInfoWithObjectId instance into the collection.
   @UpsertOperation
   write operation upsertFlightWithObjectId(FlightInfoWithObjectId):FlightInfoWithObjectId
}

Sample mutating queries

Insert data

This example shows inserting data into a Mongo collection.

Note that the objectId is null, allowing Mongo to assign an Id.

given { movie : FlightInfoWithObjectId = {
    objectId : null ,
    code : "TK 1989",
    departure: "IST",
    arrival: "LHR",
    airline: { code: "TK", name: "Turkish Airlines", starAlliance: true}
  }
}
call FlightsDb::upsertFlightWithObjectId

Update data

given { movie : FlightInfoWithObjectId = {
    objectId : "7df78ad8902ce46d" ,
    code : "TK 1990",
    departure: "IST",
    arrival: "LHR",
    airline: { code: "TK", name: "Turkish Airlines", starAlliance: true}
  }
}
call FlightsDb::upsertFlightWithObjectId

Stream data from Kafka into Mongo

This example shows streaming stock price updates from a Kafka topic directly into Mongo, updating based off the symbol:

import flow.kafka.KafkaService
import flow.kafka.KafkaOperation

// Kafka model and service emitting prices:
model StockPrice {
  symbol: StockSymbol inherits String
  currentPrice : StockPrice inherits Decimal
}

@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
  stream prices : Stream<StockPrice>
}


// Mongo model and service for saving prices:
@Collection(connection = "stockPricesMongoDb", collection = "stockPrices")
closed parameter model SavedStockPrice {
   @Id
   symbol : StockSymbol
   currentPrice : StockPrice
   timestamp : Instant = now()
}

@MongoService( connection = "stockPricesMongoDb" )
service StockPricesMongoService {
   table prices: SavedStockPrice[]

   @UpsertOperation
   write operation updatePrice(SavedStockPrice):SavedStockPrice
}

Given the above, the following query will save updated Kafka ticks into Mongo:

stream { StockPrice }
call StockPricesMongoService::updatePrice

Build a REST API that reads from Mongo

This is a full example, where we create an HTTP endpoint accepting a GET request with a ticker symbol.

We’ll use the same model and services declared in Stream data from Kafka to Mongo, to avoid redeclaring them here.

@HttpOperation(url = "/api/q/stockPrices/{symbol}", method = "GET")
query FetchStockPrices(@PathVariable("symbol") symbol:StockSymbol) {
  find { SavedStockPrice( StockSymbol == symbol) }
}