Kotlin SDK

Installation

Install the library using the following Maven dependency.

kotlin client?label=Latest%20version&style=for the badge
<dependency>
  <groupId>com.{code-product-name}</groupId>
  <artifactId>kotlin-client</artifactId>
  <version>${{short-product-name}.version}</version>
</dependency>

Generate Kotlin from Taxi

Query using the typesafe builder

Queries start with a verb - either find (for finite result sets) or stream (for infinite streams of data, like a Kafka topic).

import flow.client.find
import flow.client.stream

// Will look for a service that exposes Film[]
val listOfFilms = find<List<Film>>()

// Will look for a service that exposes Film[]
val singleFilm = find<Film>()

// Will look for a service that exposes a Stream<FilmWatchedEvent>
// (such as a kafka topic)
val announcements = stream<FilmWatchedEvent>()
Currently, you can’t pass any criteria through the typesafe builder. We’re working on that right now.

Project to a response object

Flow really shines when you ask for a source object to be transformed into something else.

The Kotlin SDK gives you typesafe querying using response objects, defined as data classes, using type aliases annotated with @DataType. These are normally generated, but don’t have to be.

// Normally, these are generated...
@DataType("com.foo.FilmId")
typealias FilmId = String
// etc...


// Define a response object
data class Response(
    val id: FilmId,
    val title: Title,

    val streamingProviderName: StreamingProviderName,
    val cost: StreamingProviderPrice,

    val reviewScore: FilmReviewScore,
    val reviewText: ReviewText
)

Once you have a response object defined, you can project your source object to the response:

// Transforming a List of Films to a List of Responses
val response = find<List<Film>>()
    .asA<List<Response>>()

// or, with a stream of data:
val response = stream<FilmWatchedEvent>()
    .asA<Response>()

Specify criteria

Specifying criteria isn’t yet implemented in the Kotlin SDK, but we’re working on it.

Query using TaxiQL directly

Passing TaxiQL directly isn’t yet implemented in the Kotlin SDK, but we’re working on it.

Execute queries

Once you have your query defined, you need to execute, using run().

run takes a transport, which is responsible for connecting to the Flow server, and handling responses.

import flow.client.converter.run

val response = find<List<Film>>()
    .asA<List<Response>>()
    .run(http("http://localhost:9021"))
    .toFlux()
run returns a Publisher, which is a reactive type. You need to subscribe to the publisher, or the query won’t be executed.

Transports

Currently, transports are provided for http and httpStreaming, implemented using okhttp.

However, the library is designed such that transports can be pluggable by implementing the Flow Transport interface.

Generally, httpStream is a reasonable default.

http transport

Executes a query over http, returning the full result set once transformations have been completed.

Unlike httpStream, the full response object is held in memory and returned once the query has been fully executed. As a result, the response type is List<T>, receiving the fully collected list.

For larger queries, you should use httpStream.

httpStream transport

Executes a query over websocket, and streams the results as they become available.

When queries are made against a streaming source (such as a message queue or Kafka topic), then each message is transformed and emitted independently.

When queries are made against a request-response data source (such as a database or query), then responses are emitted individually, as soon as the message is transformed.

Flow executes queries and transformations in parallel, so responses are streamed as soon as they are available.

This is generally preferred, as the Flow server doesn’t need to hold results in memory, and clients get results sooner.

Example

import flow.client.converter.run
import flow.client.find
import flow.client.transport.okhttp.http

val response:Flux<List<Response>> = find<List<Film>>()
    .asA<List<Response>>()
    .run(http("http://localhost:9021"))
    .toFlux()