Integrate a database, a REST API and Kafka

Overview

In this tutorial, we’ll set up a local instance of Flow, and then see how to use Flow to link data from a REST API, a database, and a Kafka topic.

This is a step-by-step introduction for beginners which walks you through the basics of linking data sources together using Flow’s UI.

Our use case is to find a list of films and discover which online streaming service has the films available to watch.

This involves linking the Films list from our database with streaming service information from a REST API.

Finally, we’ll listen for updates on a Kafka topic, sending Protobuf messages, and join those with the database and API.

Here’s what our demo ecosystem looks like:

A database

Prerequisites

You should have Docker and Docker Compose.

Start a local instance of Flow

In this tutorial, we’re going to deploy Flow, as well as a few demo projects that we’ll use throughout.

Everything you need is packaged up in a Docker Compose file to make getting started easier.

To launch, clone the repository and follow instructions in the README file:

git clone https://github.com/hazelcast/hazelcast-flow-public-demos.git
cd hazelcast-flow-public-demos/films-demo

After about a minute, Flow should be available at http://localhost:9021.

To make sure everything is ready to go, head to the Projects Explorer to make sure that some schemas are registered. It should look like the screenshot below. If you see a message saying there’s nothing registered, wait a few moments longer.

The Projects screen

You now have Flow running locally, along with a handful of demo services which we’ll use in our next steps.

If you run docker ps, you should see a collection of Docker containers now running.

Container Name Part of Flow stack or Demo? Description

flow

Flow

The core Flow server, which provides our UI, and runs all our integration for us

management-center

Flow

The Management Center, for monitoring and managing Flow

films-api

Demo

A REST API, which exposes information about films. We also publish a Protobuf message to Kafka from here

pg-pagila

Demo

A Postgres DB, which contains the Postgres Pagila demo database for a fake DVD rental store

kafka

Demo (Kafka)

The Apache Kafka image

View the project

An empty project named films has been created for Flow to hold the connection details of our data sources, and schemas. The project is a Taxi project which gets edited locally, then checked into Git once you’re ready to go to production.

  • From the sidebar, click Projects

  • You will see the films project listed

  • Click on the project to view the details

Field Value

Organisation

com.hazelflix

Version

0.1.0

As we progress through this tutorial, we’ll be adding connections to our database, REST API and Kafka topic to this project. You can view the taxi schemas that Flow generates in this project.

Connect a database table

Next, we’ll add a connection to our database, and make a table available as a datasource that Flow can fetch data from.

This demo ships with an instance of the Postgres demo DB called Pagila.

Pagila contains several tables related to running a fictional DVD Rental store, including details of all sorts of different films, actors, etc. We’ll use this database as part of our walk through.

To get started, click on Data sources on the side bar, and click Add a data source (or navigate to http://localhost:9021/data-source-manager/add)

Define the database connection

First, we need to tell Flow how to connect to the database.

  • From the project drop-down, select the "films" project

  • Select Database Table as the data source to add

  • For Connection, select Add a new connection…​

  • A pop-up window appears, allowing you to create a connection to our database

  • Fill in the form with the details below:

Parameter Value

Connection name

films-database

Connection type

Postgres

Host

pg-pagila

Port

5432

Database

pagila

Username

postgres

Password

admin

  • Click Test connection and wait for the connection test to be successful

  • Click Save.

The connection to the database has now been created, and the pop-up should close.

Select the table to import

Now that Flow has a connection to the database, we need to select the tables we want to make available for Flow to query from.

Flow will create schema files for the contents of the table. Specifically, Flow will create:

  • A model for the table, defining all the fields that are present

  • A series of types, which describe the content of each field

  • A query service, which lets Flow run queries against the database

To import the schema:

  • Add a new data source using the new films-database connection and select the film table

  • Complete the form for the database table to import using the parameters below:

Parameter Value

Connection

films-database (Note - this should already be populated from the previous step)

Table

film

Default namespace

com.hazelflix.films.filmsdatabase

Namespaces are used to help us group related content together, like packages in Java or namespaces in C# and Typescript.

Here, we’re providing a default namespace, which will be applied to the types, models and services Flow will create importing this table.

  • Click Configure

Flow will connect to the database, and create all the necessary schema configuration for us for the table.

Data sources screen

Preview the imported tables

Flow now shows a preview of the types, models and services that will be created.

Schema view

Click around to explore the different models, types and services that will be created. For now, the defaults that have been assigned are good enough.

  • Click Save

Flow will create the necessary schema files in a local project.

Flow also creates a series of Taxi schema files that contain the schemas we’ve just imported. You can explore these files locally. You will find them in the 'taxi/src' directory.

cd taxi/

Taxi ships a great VS Code plugin which provides click-to-navigate, syntax highlighting, autocompletion and more.

You’ve now connected a database to Flow, and exposed one of its tables, so that Flow can run queries against it.

Connect a Swagger API

In this step, we want to tell Flow about our REST API, which exposes information about which streaming service each of our films is available on.

We’ll use the UI of Flow to import a Swagger definition of our REST API

  • Click Data Sources on the sidebar

  • Once again, click Add a data source

  • Alternatively, navigate to http://localhost:9021/data-source-manager/add

  • Select the films project as the target project

  • From the drop-down list, select Swagger / Open API as the type of schema to import

  • For the Swagger Source, select a URL

Fill in the form with the following values:

Parameter Value

Swagger source

http://films-api/v3/api-docs

Default namespace

com.hazelflix.listings

Base url

Leave this blank

  • Click Configure

Update the service type

A preview of the imported schema is once again displayed.

This time, we do need to modify some default values.

Click on ServicesgetStreamingProvidersForFilm.

This shows the API operation that’s exposed in the Swagger spec we just imported. This API accepts the ID of a film, and returns information about the streaming services that have the film available to watch.

Now, take a look at the parameters section of the getStreamingProvidersForFilm service (you may need to scroll down).

Note that the input parameter - filmId is typed as Int. Since we know that this is a FilmId (the same value that’s exposed by the Films database table), we need to update the type accordingly, so that Flow knows these two pieces of information are linked.

  • Click on the Int link

  • In the search box, type FilmId

  • Select the FilmId type that’s shown

  • Finally, click Save

Great! We’ve now exposed the Swagger API to Flow.

What just happened?

We’ve connected the Swagger schema of a REST API to Flow. Flow now knows about this service, and will make calls to it as needed.

Importantly, we’ve defined a link from the data in our database to the data in the Rest API. The schema diagram shows an outline of this relationship:

schema db and api

Integrate services and loading data

Now that everything is set up, let’s fetch and integrate some data.

List all the films in the database

Queries in Flow are written in TaxiQL. TaxiQL is a simple query language that isn’t tied to one specific underlying technology (i.e., it’s independent of databases, APIs, etc.).

This means we can write queries for data without worrying where the data is served from.

Our first query is very simple - it just finds all the films.

find { Film[] }
  • Click Run.

This query asks Flow for all Film records. When this query is executed, Flow looks for services that expose a collection of Films, and invokes them. In our example, this means Flow will query the database to select all available films.

There are different options to show the result of Flow queries. These are displayed as tabs under the query editor.

  • Table - Ideal for tabular, two-dimensional data

  • Tree - Ideal for nested data

  • Raw - Raw JSON - ideal for larger result sets

  • Profile - What work Flow did to produce the result. Contains information about the systems called by Flow, performance stats and lineage information

Once the query has completed, a list of records appears in the grid.

Results table

Transform the data

Flow lets you restructure data in a way that’s useful to you. Our original query returned the data as a flat list, since it’s coming from a database.

However, for our purposes (let’s say we’re building a UI) we might want to restructure the data to a subset of fields, grouped in a way that’s useful.

  • Paste the below query into the Query Editor.

find { Film[] } as {
    film: {
        name: Title
        id : FilmId
        description: Description
    }
    productionDetails: {
        released: ReleaseYear
    }
}[]
  • Click Run.

This time, the data has been returned structured as a tree. To see the tree data, click on the Tree tab in the results panel.

Tree tab

Our data has now been restructured into a tree shape. Using this approach, we can change the shape of the structure, along with field names.

In Taxi language, this is called a projection as we’re changing the shape of the output.

Combine data from our DB and REST API

Finally, let’s add in data about which streaming movie service contains each movie. This requires linking data between our database and our REST API.

As Flow is handling all of the integration for us, this is as simple as updating our query to include the provider data.

Flow works out how to call the REST API, which data to pass, and what to collect.

  • Paste the below query:

find { Film[] } as {
    film: {
        name: Title
        id : FilmId
        description: Description
    }
    productionDetails: {
        released: ReleaseYear
    }
    providers: StreamingProvider
}[]
  • Click Run.

When the query results are returned, as this is nested data, ensure you’re in the Tree view to see the results. Note that we now have data from our database, combined with data from our REST API.

results tree with providers

Explore the query execution

Flow has several diagnostic tools to help us see what happened.

Explore the query execution plan

In the Profiler, click to see the high level integration plan that Flow used to execute the query, showing the services that were called, and how data was resolved at a field level.

query lineage

Explore the individual server requests

In the Profiler, click to see a sequence diagram of calls that have taken place to different services. Clicking on any of the rows shows the actual request and response.

call explorer

Explore cell-based lineage

Flow provides detailed trace lineage for each value shown in its results.

In Tree mode, try clicking on one of the names of the streaming providers. A lineage display will open, showing the trace of how the value was derived.

  • We can see that a value of Netflix was returned from an Http operation

  • The input to that Http operation was a FilmId - in our example, the value 1

  • Clicking on the FilmId expands the lineage graph to show where that FilmId came from

  • We can see that the FilmId was returned as the result of a database query

value lineage

This deep lineage is very powerful for understanding how data has flowed, and proving the provenance of data that Flow is exposing.

Run our query via curl

Although Flow’s UI is powerful, developers will want to interact with Flow through its API. That’s a topic on its own, but here is an example of running the same query through Flow’s API, using curl.

Get a JSON payload

We can use curl to get the results of our query as a JSON document.

  • Copy and paste the below snippet into a shell window, and press Enter:

curl 'http://localhost:9021/api/taxiql' \
  -H 'Accept: text/event-stream;charset-UTF-8' \
  -H 'Content-Type: application/taxiql' \
  --data-raw 'find { Film[] } as {
    film: {
        name: Title
        id : FilmId
        description: Description
    }
    productionDetails: {
        released: ReleaseYear
    }
    providers: StreamingProvider
}[]'
Streaming versus batch results. The curl command streams results from Flow as soon as they’re available. That’s because we set the Accept header to text/event-stream. This is both fast, and more efficient for Flow, as it’s not holding results in memory, allowing Flow to work on arbitrarily large datasets. If you’d rather have the results as a single batch, change the Accept header to -H 'Accept: application/json'

Add a Kafka streaming source

Now that we have Flow linking our Database and REST API, it’s time we add a Kafka stream into the mix.

We have a new releases topic that emits a message whenever Netflix decides to turn a beloved movie into a new TV series.

For this part of our demo, we’ll use Flow to listen for new release announcements, and join data from our REST API and Postgres DB.

Import a Protobuf schema

Our new releases topic emits a Protobuf message which Flow needs to know about.

To keep things simple in our demo, the Protobuf message is available via one of our APIs. You can view the Protobuf yourself by clicking on http://localhost:9981/proto. For Flow (running inside the Docker Compose network), this is visible as http://films-api/proto.

Import the spec by clicking Add a data source on the front page of Flow, or by navigating to http://localhost:9021/data-source-manager/add.

  • Select Protobuf as the type of schema to import

  • Set the Protobuf Source as a URL

  • Paste the URL: http://films-api/proto

  • Click Configure

You should see a preview of a newly created model: NewFilmReleaseAnnouncement.

To ensure the filmId attribute in NewFilmReleaseAnnouncement uses the standard FilmId type used elsewhere in the company, follow these steps:

  • Locate the Models Table: On the left-hand side of your screen, find the table labeled "Models."

    • Navigate to NewFilmReleaseAnnouncement

    • Within the Models table, click on "Models" → "NewFilmReleaseAnnouncement." This will open the data model for NewFilmReleaseAnnouncement.

    • Find the Attributes Table: In the NewFilmReleaseAnnouncement data model, locate the "Attributes" table. This table lists the properties (or attributes) of the NewFilmReleaseAnnouncement model.

  • Identify the filmId Attribute: In the "Attributes" table, find the row for the filmId attribute. You’ll see the current data type displayed next to it (likely Int).

  • Change the filmId Type:

    • Click on the underlined Int type next to filmId. This will open a dropdown or search box.

    • In the search box, type "FilmId"

    • From the search results, select com.hazelflix.films.filmsdatabase.film.types.FilmId. This will update the filmId attribute to use the standardized FilmId type.

FilmIdAttribute

After selecting the FilmId type, click Save to import the protobuf definitions and save the changes.

The announcement field has been typed as Announcement. As there’s no existing types in our company for this data, it’s fine to leave as-is, and use the newly created type.

We’ve now imported a Protobuf schema, and linked its fields to other fields in our schema.

Import a Kafka topic

Next we need to tell Flow about the Kafka topic.

  • Click the Flow logo in the navigation bar to return to the Flow home page.

  • Click Add a Data Source or navigate to http://localhost:9021/data-source-manager/add

  • From the drop-down, select Kafka Topic

  • In the Connection Name, select Add a new connection…​

Fill out the form with the following details:

Parameter Value

Connection name

my-kafka

Connection type

kafka (should already be populated)

Broker address

kafka:19092

Group Id

MyConsumerGroup (should already be populated)

  • Click Create. A new Kafka connection is created, and the popup closes

Fill out the rest of the form with the following details:

Parameter Value

Connection name

my-kafka (should have been populated when the pop-up closed)

Topic

releases

Topic Offset

LATEST

Namespace

com.hazelflix.announcements

Message Type

NewFilmReleaseAnnouncement

Service and Operation Name

Leave these blank

  • Click Configure

A preview of the schema is shown.

By clicking Services → MyKafkaService → consumeFromReleases, you can see a new operation has been created which returns a Stream<NewFilmReleaseAnnouncement>.

Streams are a different type of operation. Rather than request / response like an HTTP operation exposes, these expose a continuous stream of data.

Take a look around, and then click Save.

The message type NewFilmReleaseAnnouncement should be pre-populated in a drop-down menu. If it’s not visible, double check that you’ve imported the Protobuf schema correctly.

Join data from Kafka, API and our DB

It’s time to explore writing some queries that join data from across all three sources.

First, let’s start with query our Kafka topic. Head over to the Query Editor, and paste the following query:

stream { NewFilmReleaseAnnouncement }

You should see results streaming in, which are being published to our Kafka topic.

streaming query simple

Now, let’s enrich our Kafka stream with data from our other sources.

Cancel the running query, and paste the following:

import com.hazelflix.films.filmsdatabase.film.types.FilmId

stream { NewFilmReleaseAnnouncement } as {
    // The announcement comes from our Kafka Protobuf message
    news: {
        announcement: NewFilmReleaseAnnouncement
    }
    // Grab some film information from the Database
    film: {
        name: Title
        id : FilmId
        description: Description
    }
    productionDetails: {
        released: ReleaseYear
    }
    // And query the REST API to see where we can watch this
    providers: StreamingProvider
}[]

In the results panel, you should see the following:

streaming data

Looking in the Profiler tab, you can see the updated integration plan:

query lineage with kafka

What just happened?

  • Flow read our Protobuf message from the Kafka topic

  • It enriched it with data from a database query

  • It then fleshed it out with information from a REST API call

  • And served it up in our UI

What’s next?

In this tutorial, we’ve set up Flow and used it to automatically integrate data from a Postgres Database, a REST API, and a Kafka topic with Protobuf.

Look under the hood

To get a better understanding of what’s happened under the hood, take a look at some of the files that Flow has generated during this tutorial.

Directory What’s there?

workspace.conf

The config file that lists all the projects - including the one we created. It defines where to read and write the schema files Flow created in the background.

taxi/

The schema project that Flow was writing schemas to

taxi/config/connections.conf

A connections file defining the database and Kafka connections you imported in the UI.

taxi/src/

The Taxi schemas that Flow generated for you. These are the schemas that describe the data you’ve been working with.

Stop the services

When you’re done, you can stop the services by running:

docker compose down