Hazelcast as a data source

In addition to using Hazelcast as a cache, Flow can use Hazelcast as a source for reading and writing data, as well as a streaming data source.

Add a Hazelcast data source

To enable Hazelcast as a data source, first define a connection in your connections.conf file:

hazelcast {
   myHazelcast {
      connectionName = myHazelcast
      addresses = ["hazelcast:5701"]
   }
}
Assuming you are using Docker Compose to run Flow, here is an example of how to configure an external Hazelcast container:
hazelcast:
  image: "docker.io/hazelcast/hazelcast:latest"
  healthcheck:
    test: curl --fail http://localhost:5701/hazelcast/health || exit 1
    interval: 30s
    timeout: 5s
    retries: 3
  ports:
    - "5701:5701"
  environment:
    HZ_MANAGEMENTCENTER_SCRIPTINGENABLED: true
    HZ_MANAGEMENTCENTER_CONSOLEENABLED: true

Supported formats

When values are written to Hazelcast, they must be serialized.

The following formats are supported. Note that because Flow does not have access to custom Java classes, serialization formats that require classloading (such as Serializable, Externalizable and Custom Serialization) are not supported.

Compact (preferred)

Compact Serialization is the preferred format when using Flow to read/write to Hazelcast.

To enable Compact serialization, add a @CompactObject annotation to your Taxi definition:

import flow.hazelcast.CompactObject
import flow.hazelcast.HazelcastMap

@HazelcastMap(name = "films")
@CompactObject
closed model Film {
   @Id
   filmId: FilmId inherits Int
   // ...omitted...
}

JsonValue

Hazelcast’s JsonValue is supported for working with existing caches that have stored data using JsonValue.

Note - if you are both writing and reading the data using Flow, it is recommended to use Compact, which has better performance.

Write data to Hazelcast

To write data into a Hazelcast map, define a Hazelcast service in your Taxi project, with a write operation:

import flow.hazelcast.HazelcastService
import flow.hazelcast.HazelcastMap
import flow.hazelcast.UpsertOperation
import flow.hazelcast.CompactObject

// The HazelcastService annotation should specify the
// name of the connection defined in your connections.conf
@HazelcastService(connectionName = "myHazelcast")
service HazelcastService {
   // Expose a write operation, and annotate it
   // with UpsertOperation, to define the writing behavior
   @UpsertOperation
   write operation upsert(Film):Film
}

type Language inherits String

// Define the name of the map to use. If not present in Hazelcast,
// the map wil be created
@HazelcastMap(name = "films")
// specify the serialization format
@CompactObject
closed model Film {
   // A field annotated with @Id becomes the key.
   // Composite keys are not supported
   @Id
   filmId : FilmId inherits Int
   title : Title inherits String
   languages: Language[]
}

With your map declared, use a mutation query to insert data:

Examples

Insert a static value into Hazelcast

// inserting a static value into Hazelcast
given { film : Film =
  {
    filmId : 123,
    title : "Star Wars",
    languages : ["English" , "American" ]
  }
}
call HazelcastService::upsert

Store a Query Result in Hazelcast

To write the outcome of a query to Hazelcast, it’s not necessary for the query’s result format to align with the format of the persisted value.

Flow will automatically adapt the query result to the required persistence format, which may involve projections and even calling additional services if needed.

find { NetflixFilms[] } as {
  // projection not shown
}
call HazelcastService::upsert

Stream data into Hazelcast

stream { FilmUploadedEvent }
// Each FilmUploadedEvent is projected into a Film
// which is the input parameter to `upsert`
call HazelcastService::upsert

Query data from Hazelcast

Flow supports querying from Hazelcast using both direct key lookups, and rich query criteria.

Define a map to query

To query a map, you first define a service that exposes your Hazelcast map.

Maps are exposed using Taxi’s table declaration, as this indicates a data source that supports rich querying.

Here’s a complete example:

import flow.hazelcast.HazelcastService
import flow.hazelcast.HazelcastMap
import flow.hazelcast.CompactObject

// The HazelcastService annotation should specify the
// name of the connection defined in your connections.conf
@HazelcastService(connectionName = "myHazelcast")
service HazelcastService {
    // Table is a shorthand to declare
    // a data source that supports rich querying.
    table films : Film[]
}

// Define the name of the map to query.
@HazelcastMap(name = "films")
// specify the serialization format
@CompactObject
closed model Film {
   // A field annotated with @Id becomes the key, which
   // is used when performing key lookups
   @Id
   filmId : FilmId inherits Int
   title : Title inherits String
   languages: Language[]
}

Write queries

Once a Hazelcast map is exposed, it can be queried as a standard data source, including being used as a data source when projecting and joining data from other sources (such as APIs, Kafka topics, or databases).

Here are some sample queries:

Fetch everything from a map

find { Film[] }

Fetch a value with a specific key

If criteria are defined against the key (as defined using an @Id annotation), then a key lookup is performed:

// Assuming FilmId is annotated as @Id in the Film model
// as shown...
model Film {
   @Id
   filmId : FilmId inherits Int
   // ..snip..
}

// Elsewhere, writing a query...
find { Film( FilmId == 123 ) }

Fetch values using criteria

// find all films with a FilmId < 105
find { Film[]( FilmId < 105 ) }

// find all films released after 2019 with the title Star Wars
find { Film[]( ReleaseYear < 2019 && Title == "Star Wars" ) }

Stream data from Hazelcast

Hazelcast maps can be treated as data streams, where inserts or updates are created as streams of events which can be queried using Flow.

The following events trigger the current state of the record to be written to the event stream:

  • Entry Added

  • Entry Updated

Declare a map as a stream

To stream updates from a map, you first define a service that exposes your Hazelcast map as a stream.

Here’s a complete example:

import flow.hazelcast.HazelcastService
import flow.hazelcast.HazelcastMap
import flow.hazelcast.CompactObject

// The HazelcastService annotation should specify the
// name of the connection defined in your connections.conf
@HazelcastService(connectionName = "myHazelcast")
service HazelcastService {
   stream films : Stream<Film>
}

// Define the name of the map to query.
@HazelcastMap(name = "films")
// specify the serialization format
@CompactObject
closed model Film {
   // A field annotated with @Id becomes the key, which
   // is used when performing key lookups
   @Id
   filmId : FilmId inherits Int
   title : Title inherits String
   languages: Language[]
}

Write streaming queries

Once a Hazelcast map is exposed as a stream, it can be queried as a standard data source, including used as a data source when projecting and joining data from other sources (such as APIs, Kafka topics, or databases).

Below are some sample queries. It’s useful when testing to combine this with writing data to Hazelcast to trigger change events which produce values on the data stream.

Here are some sample queries:

Stream all updates from a map

stream { Film }

Stream all updates from a map, and enrich with data from other sources

stream { Film } as {
  id : FilmId
  reviewScore : FilmReviewScore // not present in the map, will be looked up from another data source
}[]

Stream only specific events from a map

// Only provide updates on Films whose FilmId is less than 300
stream { Film.filterEach( (FilmId) -> FilmId < 300  ) }