Use AWS services

Before you begin using AWS services, you need to define a connection to AWS in your connections.conf file. Read more about defining AWS connections here.

Flow supports reading Protobuf messages, either as a message format for HTTP endpoints or from a streaming query originating from Kafka.

You can annotate Protobuf sources directly with Taxi metadata, or you can import through the Schema Importer.

DynamoDb

DynamoDb services can only be declared in Taxi.

Querying against key is supported, as are write operations.

To expose a DynamoDb service, the following two annotations are required:

  • Add a @flow.aws.dynamo.Table to a model

  • Add a @flow.aws.dynamo.DynamoService annotation to the service

import flow.aws.dynamo.DynamoService
import flow.aws.dynamo.Table

// connectionName must match the defined connection.
@Table( connectionName = "myAws" , tableName = "reviews" )
model Review {
  @Id
  movieId : MovieId
  score: ReviewScore inherits Int
}

@DynamoService
service DynamoService {
  // Exposes query operations that return Review[]
  table reviews: Review[]

  // allows upsert calls
  @UpsertOperation
  write operation saveReview(Review):Review
}

Write operations (such as upserts) can only be invoked in mutations.

Lambda

Flow can invoke a Lambda, either to discover data in a query, or in a mutation.

A service must have the AwsLambdaService annotation, passing the configured connection.

Operations have the LambdaOperation annotation.

import flow.aws.lambda.AwsLambdaService
import flow.aws.lambda.LambdaOperation

@AwsLambdaService( connectionName = "myAws" )
service MovieDb {
  @LambdaOperation(name = "streamingprovider")
  operation movieQuery(@RequestBody StreamingProviderRequest): StreamingProviderResponse
}

S3

Flow can use S3 as a source for both reading and writing data.

Read from S3

To declare a data source that exposes data, take the following steps:

  • Declare a service annotated with @S3Service

  • Within that service, declare an operation with @S3Operation, providing the bucket name

  • The operation should take a single parameter, of type FilenamePattern, indicating the filename (or pattern) to read from

Be sure to either add the imports for S3Service, S3Operation and FilenamePattern (as shown below), or use their fully qualified names.

Here’s an example:

import flow.aws.s3.S3Service
import flow.aws.s3.S3Operation
import flow.aws.s3.FilenamePattern

@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
    @S3Operation(bucket = "MyBucket")
    operation readBucket(filename:FilenamePattern = "films.csv"):Film[]
}

Filename patterns when reading from S3

When reading from S3 files, you can either specify a single filename (e.g., films.csv), or a pattern - such as film*.csv, .csv, or even just to read everything in the bucket.

When working with a pattern, Flow will read and combine all matching files, treating them as a single response.

For information about supported file formats, see file formats.

Examples - Reading from S3

Each of the below examples will work with the following model of content stored on S3, so we’re defining it once here for brevity:

import flow.formats.Csv
import flow.aws.s3.S3Service
import flow.aws.s3.S3Operation
import flow.aws.s3.FilenamePattern


// Define the format we're reading.
// This is a CSV file, so the @Csv annotation is used.
// (Note it's also imported at the top of the file)

@Csv
type TradeSummary {
   symbol : Symbol inherits String
   open : OpenPrice inherits Decimal
   high : HighPrice inherits Decimal
   close : ClosePrice inherits Decimal
}

Read a single file

@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
   @S3Operation(bucket = "MyTrades")
    operation readBucket(filename:FilenamePattern = "trades.csv"):TradeSummary[]
}

With the above schema, we can issue a simple query to return the contents of trade.csv:

find { TradeSummary[] }

Read the contents of multiple files

  @S3Service(connectionName = "MyAwsConnection")
  service AwsBucketService {
      @S3Operation(bucket = "MyTrades")
      // This will read all files ending in csv present in the bucket
      operation readBucket(filename:FilenamePattern = "*.csv"):TradeSummary[]
  }

With the above schema, we can issue a simple query to return the contents of all *.csv files in the bucket:

find { TradeSummary[] }

Read files and expose as an HTTP endpoint

Using the same service definition as shown above, we can expose the contents of our *.csv files with a query published as an HTTP endpoint:

import taxi.http.HttpOperation

@HttpOperation(url = "/api/q/trades", method = "GET")
find { TradeSummary[] }

For more information, see publish queries as endpoints.

Read files and publish to Kafka

This example shows how to read a CSV file from S3, and publish each row as an individual message to Kafka, as a JSON object.

First, we’ll declare our Kafka broker and associated message format:

import flow.kafka.KafkaService
import flow.kafka.KafkaOperation

// The message format we're publishing to Kafka.
// Because there's no format defined, it's JSON by default
model TradeSummaryEvent {
   ticker : Symbol
   // field names and structure are different, but the
   // types are the same as on our source model.
   prices: {
      openPrice : OpenPrice
      highPrice : HighPrice
      closePrice : ClosePrice
   }
}

// Declare our Kafka service and operation
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {

   // Define an operation that writes to Kafka
   @KafkaOperation( topic = "tradeRecords" )
   write operation publishTrades(TradeSummaryEvent):TradeSummaryEvent
}

With the above in place, we can write a query that reads from S3, transforms from CSV to our JSON format, and writes it out to Kafka.

find { TradeSummary[] }
call MyKafkaService::publishTrades

In the above example, Flow detects that the inbound model (TradeSummary) is different from the destination format (TradeSummaryEvent) and handles the transformation for us.

In our example, that’s simple converting from CSV to JSON and restructuring the message. However, the transformation could be richer, doing tasks such as calling services to discover data.

Finally, we might want to expose an HTTP POST operation to trigger this update:

import taxi.http.HttpOperation

@HttpOperation(url = "/api/q/publishTradeUpdates", method = "POST")
find { TradeSummary[] }
call MyKafkaService::publishTrades

For more information about working with Kafka, including defining connections to brokers, see our dedicated docs on Kafka.

Read files and save to a database

This example shows how to read a CSV file from S3, and save each row as a record to a database.

First, we’ll define our database table, and associated service:

import flow.jdbc.Table
import flow.jdbc.DatabaseService
import flow.jdbc.InsertOperation

@Table(connection = "trades-database", schema = "public" , table = "trades" )
type TradeSummaryRecord {
   symbol : Symbol
   open : OpenPrice
   high : HighPrice
   close : ClosePrice
   timestamp : Instant = now()
}

@DatabaseService(connection = "trades-database")
service TradesDatabase {
   @InsertOperation
   write operation saveTradeSummary(TradeSummaryRecord):TradeSummaryRecord
}

With the above in place, we can write a query that reads from S3, transforms from CSV to our database format, and performs the database inserts:

find { TradeSummary[] }
call TradesDatabase::saveTradeSummary

In the above example, Flow detects that the inbound model (TradeSummary) is different from the destination format (TradeSummaryRecord) and handles the transformation for us.

In our example, that’s simple converting from CSV to JSON and restructuring the message. However, the transformation could be richer, doing tasks such as calling services to discover data.

Finally, we might want to expose an HTTP POST operation to trigger this update:

import taxi.http.HttpOperation

@HttpOperation(url = "/api/q/publishTradeUpdates", method = "POST")
find { TradeSummary[] }
call TradesDatabase::saveTradeSummary

For more information about working with databases, including defining connections to databases, and the support for different types of databases, see Databases.

Write to S3

To declare an operation that can write data to S3, take the following steps:

  • Declare a service annotated with @S3Service

  • Within that service, declare a write operation with @S3Operation, providing the bucket name

  • The operation should take two parameters:

    • One with a @RequestPayload annotation, which contains the contents to be written

    • One of type FilenamePattern which defines the filename to write to

Be sure to either add the imports for S3Service, S3Operation, RequestBody and FilenamePattern (as shown below), or use their fully qualified names.

Here’s an example:

import flow.aws.s3.S3Service
import flow.aws.s3.S3Operation
import flow.aws.s3.FilenamePattern
import flow.aws.s3.RequestBody

@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
    @S3Operation(bucket = "MyBucket")
    write operation writeToS3(@RequestBody films:Film[], filename:FilenamePattern = "films.csv"):Film[]
}

Filename patterns when writing to S3

When writing to S3 filenames, filename patterns are not supported (unlike when reading).

If you declare a filename with a pattern, an error will be thrown.

Examples - Writing to S3

Each of the below examples will work with the following model of content stored on S3, so we’re defining it once here for brevity:

import flow.formats.Csv

// Define the format we're reading.
// This is a CSV file, so the @Csv annotation is used.
// (Note it's also imported at the top of the file)
@Csv
type TradeSummary {
   symbol : Symbol inherits String
   open : OpenPrice inherits Decimal
   high : HighPrice inherits Decimal
   close : ClosePrice inherits Decimal
}

Fetch from an API and write the results to S3 as a CSV

This example shows data fetched from a REST API (exposed as JSON), and stored onto S3.

As part of the operation, we’ll transform a tree-like JSON structure into a flattened CSV file.

First, we’ll define the API and its response object:

model StockPriceUpdate {
   ticker : Symbol
   // field names and structure are different, but the
   // types are the same as on our source model.
   prices: {
      openPrice : OpenPrice
      highPrice : HighPrice
      closePrice : ClosePrice
   }
}

service ApiService {
   @HttpOperation(url="http://myApi.com/prices", method = "GET")
   operation getPrices():StockPriceUpdate[]
}

And, we’ll define a write operation on S3 to store the content:

import flow.aws.s3.S3Service
import flow.aws.s3.S3Operation
import flow.aws.s3.FilenamePattern

@S3Service( connectionName = "myAwsConnection" )
service AwsBucketService {
    @S3Operation(bucket = "trades")
    write operation writeTradeSummary(@RequestBody payload: TradeSummary[], filename: FilenamePattern = "trades.csv"):StockPriceCsv[]
}

Given the above, we can use the following query to read from our API, transform the data, and write to our S3 bucket:

find { StockPriceUpdate[] }
call AwsBucketService::writeTradeSummary

This will result in the data returned from our API call to be converted to CSV and written to trades.csv on our S3 bucket.

If we’d like to set the filename within our query, we could:

given { filename : FilenamePattern = 'todaysTrades.csv' }
find { StockPriceUpdate[] }
call AwsBucketService::writeTradeSummary

This time, the output is written to todaysTrades.csv

S3 file formats

In the above examples, our content has been stored in S3 using CSV.

This is defined because the model used in our operations is annotated with @Csv, as shown in the following excerpt:

import flow.formats.Csv

@Csv
type TradeSummary {
  // ... omitted
}

@S3Service( connectionName = "myAwsConnection" )
service AwsBucketService {
    // reading CSV
    @S3Operation(bucket = "MyTrades")
    // This operation returns a collection of
    // TradeSummary objects, which are defined with @Csv
    operation readBucket(filename:FilenamePattern = "*.csv"):TradeSummary[]

    // writing CSV
    @S3Operation(bucket = "trades")
    write operation writeTradeSummary(
      // The requesy body is a collection
      // of trade summaries, which are configured as CSV
      @RequestBody payload: TradeSummary[],
      filename: FilenamePattern = "trades.csv"
    ):StockPriceCsv[]
}

The format can be any supported format, such as Avro, XML, CSV (or any other character-delimited file), or even Protobuf.

If no format is defined, JSON is used as the default.

For more information, see Data formats.

SQS

Consume events

Flow can subscribe to a stream of data from SQS.

import flow.aws.sqs.SqsService
import flow.aws.sqs.SqsOperation

@SqsService( connectionName = "moviesConnection" )
service MovieService {
  @SqsOperation( queue = "movies" )
  operation streamMovieQuery():Stream<Movie>
}

This can then be queried using a standard stream query:

stream { Movie }
// as ...

Publish events

Flow can publish to a queue using a mutation:

import flow.aws.sqs.SqsService
import flow.aws.sqs.SqsOperation

@SqsService( connectionName = "moviesConnection" )
service MovieService {
  @SqsOperation( queue = "movies" )
  write operation publishMovieEvent(Movie):Movie
}

Publishing events can only be invoked in mutations.

Example: Consuming from one SQS topic, and publishing to another

import flow.aws.sqs.SqsService
import flow.aws.sqs.SqsOperation

@SqsService( connectionName = "moviesConnection" )
service MovieService {

  @SqsOperation(queue = "newReleases" )
  operation newReleases():Stream<Movie>

  @SqsOperation( queue = "moviesToReview" )
  write operation publishMovieEvent(Movie):Movie
}

// Query: consume from the new releases queue, and publish to
// a "movies to review" queue
stream { Movie }
call MovieService::publishMovieEvent