Build event streams

This tutorial shows how to build bespoke, enriched event streams from Kafka.

Describe the data sources

Architecture for this demo

Our demo has a few services running, which we’ll join together to create a bespoke event stream:

  • Kafka Service: A topic that publishes a message whenever a new review is published

  • FilmsDb: A database containing a list of films

  • ListingsApi: A REST API that tells us where our movies are currently playing

Our services share a taxonomy used to describe the elements we can fetch:

namespace reviews {
    type FilmId inherits String
    type FilmTitle inherits String
    type ReviewText inherits String
    type PerformanceDate inherits DateTime
    type TheatreName inherits String
}

Add the Kafka broker

Configure the Kafka connection in connections.conf:

kafka {
    "my-kafka" {
        connectionName=my-kafka
        connectionParameters {
            brokerAddress="kafka:19092"
            groupId=flow
        }
    }
}

Describe the Kafka topic

Next, add some Taxi metadata to the message written onto our Kafka topic:

@lang.taxi.formats.ProtobufMessage(packageName = "" , messageName = "NewReviewPostedMessage")
model NewReviewPostedMessage {
   @lang.taxi.formats.ProtobufField(tag = 1 , protoType = "int32") filmId : reviews.FilmId
   @lang.taxi.formats.ProtobufField(tag = 2 , protoType = "string") reviewText : reviews.ReviewText
}

Finally, declare the Kafka topic:

@flow.kafka.KafkaService(connectionName = "my-kafka")
service KafkaService {

  @flow.kafka.KafkaOperation(topic = "reviews" , offset = "latest")
  stream reviews : Stream<NewReviewPostedMessage>
}

Create an event stream

We can create a dedicated event stream with the data we need by submitting the following query to Flow:

The following query is an example of how your streaming query might look if a Kafka topic named reviews, a database named FilmsDb, and a REST API named ListingsApi are connected.
stream { NewReviewPostedMessage } as {
  // We define the field names that matter to us.
  // Flow matches on data types, as field names often differ between systems.
  id : FilmId
  review : ReviewText

  name : FilmTitle // Fetched by a database call
  nextPerformances: FilmListing[] // Fetched by a REST API call
}

Specify The GroupId For Kafka subscriptions

When Flow subscribes to a Kafka topic for a streaming query, it will use the groupId connection parameter defined in connections.conf to set the value of group id for the corresponding Kafka consumer.

kafka {
    "myKafkaBroker" {
        connectionName=myKafkaBroker
        connectionParameters {
            brokerAddress="localhost:9092"
            groupId=flow
        }
    }
}

In the above configuration, the groupId is set to flow. However, you can override the groupId value in your queries by using the @StreamConsumer annotation. Here is an example:

@StreamConsumer(id = "reviewStreamGroup")
stream { NewReviewPostedMessage } as {
  id : FilmId
  review : ReviewText

  name : FilmTitle // Fetched by a database call
  nextPerformances: FilmListing[] // Fetched by a REST API call
}

The Kafka consumer created for the above streaming query will set the group id to reviewStreamGroup