Airline Connections Demo/Tutorial
This tutorial provides detailed information and instructions for the Airline Connections demo built in to Viridian Trial.
Context
This demo/tutorial creates an application that uses streaming data about flight arrival and departure times, and correlates it with stored data about connecting flights, airport gates, and gate-to-gate travel times to determine whether there’s enough time to make a particular flight connection.
Specifically, you will learn how to
-
Connect to external data sources and sinks
-
Use SQL to search and display streaming data
-
Use SQL to import data from an external database
-
Configure late event handling for streaming data
-
Join streaming and stored data to generate continually updated results
-
Submit a job to Viridian using the Command Line Client
When you launch the demo in Viridian, you may see a step-by-step guide appear in conjunction with the demo. This tutorial is designed to supplement that guide, providing additional context and detail as well as links to reference materials.
Before you Begin
Before starting this tutorial, make sure that you meet the following prerequisites:
-
You have a running cluster in Viridian Trial
-
You’ve downloaded and connected the Command Line Client to your cluster
-
For the client, you will need
-
Java 11 or greater
-
Maven 3.9.0 or geater
-
-
(Optional) Java IDE to view client code
Step 1. Review What’s Already Set Up
Begin by selecting the Airline Connections demo from the Viridian dashboard. This launches a pre-set configuration and opens the SQL browser window.
The pre-set configuration includes the following elements:
-
Connections to a Kafka server and a Postgres database
-
Mappings for streaming data
-
Mappings for IMaps to hold contextual data
-
Data imported from Postgres to local IMaps
The best way to build connections to external sources is to use the Connection Wizard. The wizard walks you through the steps of connecting to the external data source, then setting up the mapping so that the external data is available to the Hazelcast SQL engine
The SQL code for each element is below.
Mappings for streaming data:
CREATE OR REPLACE MAPPING "arrivals"
--topic name from Kafka
EXTERNAL NAME "viridiantrial.flights.arrivals" (
--fields in topic
event_time timestamp with time zone,
"day" date,
flight varchar,
airport varchar,
arrival_gate varchar,
arrival_time timestamp
)
DATA CONNECTION "ViridianTrialKafka"
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'json-flat'
);
CREATE OR REPLACE MAPPING "departures"
--topic name in Kafka
EXTERNAL NAME "viridiantrial.flights.departures" (
--fields in topic
event_time timestamp with time zone,
"day" date,
flight varchar,
airport varchar,
departure_gate varchar,
departure_time timestamp
)
DATA CONNECTION "ViridianTrialKafka"
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'json-flat'
);
Mappings for data in Postgres:
CREATE OR REPLACE MAPPING "connections"
--name of data store in Postgres
EXTERNAL NAME "public"."connections" (
arriving_flight varchar,
departing_flight varchar
)
DATA CONNECTION "ViridianTrialPostgres";
CREATE OR REPLACE MAPPING "minimum_connection_times"
EXTERNAL NAME "public"."minimum_connection_times" (
airport varchar,
arrival_terminal varchar,
departure_terminal varchar,
minutes integer
)
DATA CONNECTION "ViridianTrialPostgres";
Local storage for data from Postgres:
CREATE OR REPLACE MAPPING local_mct(
airport varchar,
arrival_terminal varchar,
departure_terminal varchar,
minutes integer
)
Type IMap
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'json-flat'
);
CREATE OR REPLACE MAPPING local_connections(
arriving_flight varchar,
departing_flight varchar
)
Type IMap
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'json-flat'
);
Import Postgres data into local storage:
--To ensure a clean write, we make sure the map is empty
DELETE FROM local_mct;
--Now we copy all the data from the external store
INSERT INTO local_mct(__key, airport, arrival_terminal, departure_terminal, minutes)
SELECT airport||arrival_terminal||departure_terminal, airport, arrival_terminal, departure_terminal, minutes
FROM minimum_connection_times;
DELETE FROM local_connections;
INSERT INTO local_connections(__key, arriving_flight, departing_flight)
SELECT arriving_flight || departing_flight, arriving_flight, departing_flight FROM "connections";
Why are we copying the Postgres data into local storage? We are using the data to enrich real-time streaming data. Having the data co-located means there’s no read delay in accessing the enriching data. |
IMap to store output of JOIN job:
CREATE OR REPLACE MAPPING live_connections(
arriving_flight varchar,
arrival_gate varchar,
arrival_time timestamp,
departing_flight varchar,
departure_gate varchar,
departure_time timestamp,
connection_minutes integer,
mct integer,
connection_status varchar
)
Type IMap
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'json-flat'
);
Step 2. Build and Test JOIN
Now that the storage framework and streaming maps are set up, you can look at the actual data streams.
-
Examine the data in the
arrivals
anddepartures
streams.SELECT * FROM arrivals;
SELECT * FROM departures;
-
When you are dealing with streaming data, you need to accommodate the possibility that data will arrive late or not at all. You do not want these late or missing events to slow down your jobs. To prevent this, you will use an
IMPOSE_ORDER
statement to define a threshold (lag) for how late events can be before they are ignored.Because you will be using this ordered data in a subsequent
JOIN
statement, you need to create a view that holds the ordered data. In this demo, both the arrivals and departures data needs to be ordered. The departures data is already done, so run this code to impose order on the arrivals data.CREATE OR REPLACE VIEW arrivals_ordered AS SELECT * FROM TABLE ( IMPOSE_ORDER( TABLE arrivals, DESCRIPTOR(event_time), INTERVAL '0.5' SECONDS ) );
-
You can look at the ordered data. It should be identical to the unordered stream, unless a message arrives later than the configured delay window.
SELECT * FROM arrivals_ordered;
-
You have all your data - now you need to put it all together so you can determine whether there’s enough time between flights to make a connection. Using SQL
JOIN
statements, you can join data on related fields. When joining two data streams, the related data is usually timestamp, so that individual events from different streams can be placed into the appropriate time context. These time-boundJOIN
statements include an aggregation window. Hazelcast buffers events until the window duration is reached, then processes the data in the buffer. Subsequent events go into the next buffer until the duration is reached again, and so on.SELECT C.arriving_flight || C.departing_flight as flight_connection, -- concatenate arriving flight and departing flight numbers as record key CASE -- sets flag of "AT RISK" if MCT is less than actual connection time WHEN CAST((EXTRACT(EPOCH FROM D.departure_time) - EXTRACT(EPOCH FROM A.arrival_time))/60 AS INTEGER) < M.minutes THEN 'AT RISK' ELSE 'OK' END AS connection_status, C.arriving_flight, A.arrival_gate, A.arrival_time, C.departing_flight, D.departure_gate, D.departure_time, CAST((EXTRACT(EPOCH FROM D.departure_time) - EXTRACT(EPOCH FROM A.arrival_time))/60 AS INTEGER) AS connection_minutes, -- calculates actual time between arrival and departure M.minutes as min_connect_time FROM arrivals_ordered A INNER JOIN local_connections C ON C.arriving_flight = A.flight -- matches arriving flight data from stream to arriving flight in connections table INNER JOIN departures_ordered D ON D.event_time BETWEEN A.event_time - INTERVAL '10' SECONDS AND A.event_time + INTERVAL '10' SECONDS -- sets JOIN window to match arrival/departure flight updates that occur within a 20 second window AND D.flight = C.departing_flight -- matches departing flight data from stream to departing flight in connections table INNER JOIN local_mct M ON A.airport = M.airport -- matches airport from arriving flight to records in minimum connection time table AND SUBSTRING(A.arrival_gate FROM 1 FOR 1) = M.arrival_terminal -- extracts arrival gate information AND SUBSTRING(D.departure_gate FROM 1 FOR 1) = M.departure_terminal -- extracts departure gate information
-
Stop the query and examine the output.
Step 3. Command Line Client setup
If you have not already set up the Command Line Client (CLC), you need to do so now. If you already have it set up, skip to Step 4. Submit Job.
-
Click on the Dashboard icon on the left of your screen.
-
Select the CLI icon.
-
Follow the steps on the screen to download the CLC and the configuration for your cluster.
Step 4. Submit Job
Up to this point, you’ve used the SQL browser to run commands. This is useful for development and testing purposes, but in most production environments, you’ll create SQL scripts that you then submit to the cluster to run as jobs, using the CLC.
-
Clone the GitHub repo for this tutorial.
-
Change to the local directory for the repo.
-
Review the contents of the file
connections_job.sql
. You can use any text editor or the Linuxmore
command.more connections_job.sql
The
JOIN
part of this file is identical to the code you ran at the end of Step 2: Build and Test JOIN. The new code is at the beginning; instead of writing the search output to the screen, the output is stored in an IMap calledlive_connections
. -
To submit the script in non-interactive mode, use the following command.
clc -c <your-cloud-config> script connections_job.sql
Don’t know the name of your cloud configuration? List available configurations using the following command.
clc config list
If you already have CLC open, you can submit the script from the CLC> prompt.
\script connections_job.sql
-
Go to the dashboard for your cluster and open Management Center.
-
In Management Center, select Stream Processing > Jobs. You should see a job called
update_connections
. Click on the job name to view processing statistics and the DAG for this job. -
In either the SQL browser tab or the CLC, view the contents of the IMap that stores the output of the job.
SELECT * FROM live_connections;
Because you are searching the contents of an IMap, the results of the above |
Step 5. Run Client
The connection data is now stored in Hazelcast and is being continually updated. Now let’s make that data available to the end application that will use it.
We’ve created a Java client that implements the Hazelcast map_listener
function. The client connects to Hazelcast, retrieves the contents of the update_connections
IMap, then updates the information any time there’s a change to the IMap.
-
Issue the following commands to build and launch the connection monitor application, replacing
<cluster-name>
with the name of the CLC cluster configuration.cd connection-monitor mvn clean package exec:java -Dexec.mainClass=hazelcast.platform.labs.airline.AirlineConnectionListener -Dexec.args=<cluster-name>
Hello Hazelcast testers! If you get an error regarding keystores with the above command, follow these steps:
-
Go to your Viridian cluster dashboard and select the Java client icon
-
Under "Advanced Setup", select "Download keystore file".
-
Find your CLC home directory with
clc home
. -
Copy the zipped keystore to
$CLC_HOME/configs/<your cluster name>
-
Unzip the keystore
-
Go back to the
connection-monitor
directory and try again.
Don’t worry - customers won’t have to do this; the next sprint will fix it so the keystore is downloaded with every config, not just Java.
-
-
Press CTRL+C to terminate the client connection.
-
(Optional) Open the Java file in your favorite IDE to review the client code.
Summary
In this tutorial/demo, you learned how to:
-
Connect to external data sources and sinks
-
Use SQL to search and display streaming data
-
Use SQL import data from an external database
-
Configure late event handling for streaming data
-
Join streaming and stored data to generate continually-updated results
-
Submit a job to Viridian using the Command Line Client