Stream data

Write streaming queries

Streaming queries are executed in the same way as request/response queries, but use the stream keyword instead of find.

For example, consider a Kafka topic emitting stock price updates:

model StockPrice {
  symbol : StockSymbol
  price : StockPrice
}

service KafkaService {
   stream stockPrices : Stream<StockPrice>
}

This can be queried using the following:

stream { StockPrice }

This can be combined with other standard querying tools, such as projections and mutations:

stream { StockPrice } as {
  symbol : StockSymbol
  updateReceived : Instant = now()
  currentPrice : StockPrice
  totalTradedQuantity : TotalTradedQuantity
}
call TradeBookService::saveTradeSnapshot

Filter streams

To filter a stream, you can use the filterEach() function:

stream { StockQuotes.filterEach( StockSymbol -> StockSymbol == 'AAPL' ) }

Run long-lived streaming queries

Flow’s query editor is great for running short-lived streaming queries. However, oftentimes you want a streaming query to continue in the background.

To deploy a long-lived streaming query, simply define a query in one of your taxonomy projects. Typically, this is checked into a Git repository.

// A sample query that streams data from a Stock price stream,
// and writes to Postgres
query MySavedQuery {
   stream { StockPrices }
   call MyPostgresService::upsertStockPrice
}

Update streaming queries

Streaming queries are automatically upgraded whenever their definition changes.

Enable a streaming query

When each streaming query is detected for the first time, it’s disabled by default to prevent accidental data changes (as streams are often mutating).

You can enable a streaming query either via the UI, or an API call from the Endpoints page.

Note that once a streaming query has been enabled once, further updates are automatically deployed and the stream remains running.