This is a prerelease version.

View latest

Connect to Amazon Kinesis

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. It can be easily combined with Hazelcast for building any number of useful data pipelines. KDS can be used by Hazelcast either as a data source or as a data sink.

In this tutorial, you will build two pipelines:

  • A pipeline that takes a continuous flow of simulated tweets and pushes them into KDS.

  • A pipeline that consumes the tweets and computes the traffic intensity in events per second.

Before You Begin

To complete this tutorial, you need the following:

Prerequisites Useful resources

A Hazelcast cluster running in client/server mode

Step 1. Set up Amazon Kinesis

  1. To create a data stream, follow the steps in the KDS Developer Guide. Instead of StockTradeStream, name your data stream Tweets.

  2. To set up permissions, follow the steps in the KDS Developer Guide.

    For the sake of this tutorial, depending on your security constraints, it might be acceptable to enable all permissions for the needed services.

    Keep in mind that you are setting up a stream called Tweets instead of StockTradeStream.
  3. To check that everything is set up correctly, install the AWS CLI and perform some basic operations with it.

Step 2. Create a New Java Project

We’ll assume you’re using an IDE. Create a blank Java project named kinesis-tutorial and copy the Gradle or Maven file into it:

  • Gradle

  • Maven

plugins {
    id 'java'
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories.mavenCentral()

dependencies {
    implementation 'com.hazelcast:hazelcast:5.1-SNAPSHOT'
    implementation 'com.hazelcast.jet:hazelcast-jet-kinesis:5.1-SNAPSHOT'
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kinesis-tutorial</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet</artifactId>
            <version>5.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet-kinesis</artifactId>
            <version>5.1-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

Step 3. Publish a Stream to Kinesis

This code publishes "tweets" (just some simple strings) to the Kinesis data stream Tweets, with varying intensity:

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.kinesis.KinesisSinks;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.test.TestSources;

import java.util.Map.Entry;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static com.hazelcast.jet.Util.entry;

public class TweetPublisher {

  public static void main(String[] args) {
    Pipeline pipeline = Pipeline.create();
    pipeline.readFrom(TestSources.itemStream(3))
     .withoutTimestamps()
     .flatMap(event -> {
       ThreadLocalRandom random = ThreadLocalRandom.current();
       long count = random.nextLong(1, 10);

       Stream<Entry<String, byte[]>> tweets = LongStream.range(0, count)
           .map(l -> event.sequence() * 10 + l)
           .boxed()
           .map(l -> entry(
               Long.toString(l % 10),
               String.format("tweet-%0,4d", l).getBytes())
           );

       return Traversers.traverseStream(tweets);
     })
     .writeTo(KinesisSinks.kinesis("Tweets").build());

    JobConfig cfg = new JobConfig().setName("tweet-publisher");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(pipeline, cfg);
  }
}

You may run this code from your IDE, and it will work, but it will create its own Hazelcast member. To run it on the Hazelcast member you already started, use the command line like this:

  • Gradle

  • Maven

gradle build
bin/hz-cli submit -c org.example.TweetPublisher build/libs/kinesis-tutorial-1.0-SNAPSHOT.jar
mvn package
bin/hz-cli submit -c org.example.TweetPublisher target/kinesis-tutorial-1.0-SNAPSHOT.jar

Let it run in the background while we go on to creating the next class.

Step 4. Use Hazelcast to Analyze the Stream

This code lets Hazelcast connect to Kinesis and show how many events per second were published to the Kinesis stream at a given time:

package org.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.kinesis.KinesisSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;

public class JetJob {
  static final DateTimeFormatter TIME_FORMATTER =
      DateTimeFormatter.ofPattern("HH:mm:ss:SSS");

  public static void main(String[] args) {
    StreamSource<Map.Entry<String, byte[]>> source = KinesisSources.kinesis("Tweets")
     .withInitialShardIteratorRule(".*", "LATEST", null)
     .build();

    Pipeline pipeline = Pipeline.create();
    pipeline.readFrom(source)
     .withNativeTimestamps(3_000) //allow for some lateness in KDS timestamps
     .window(sliding(1_000, 500))
     .aggregate(counting())
     .writeTo(Sinks.logger(wr -> String.format(
         "At %s Kinesis got %,d tweets per second",
         TIME_FORMATTER.format(LocalDateTime.ofInstant(
             Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),
         wr.result())));

    JobConfig cfg = new JobConfig().setName("kinesis-traffic-monitor");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(pipeline, cfg);
  }
}

You may run this code from your IDE and it will work, but it will create its own Hazelcast instance. To run it on the Hazelcast instance you already started, use the command line like this:

  • Gradle

  • Maven

gradle build
bin/hz-cli submit -c org.example.JetJob build/libs/kinesis-tutorial-1.0-SNAPSHOT.jar
mvn package
bin/hz-cli submit -c org.example.JetJob target/kinesis-tutorial-1.0-SNAPSHOT.jar

Now go to the window where you started Hazelcast. Its log output will contain the output from the pipeline.

If TweetPublisher was running while you were following these steps, you’ll now get a report on the whole history and then a steady stream of real-time updates. If you restart this program, you’ll get all the history again. That’s how Hazelcast behaves when working with a replayable source.

Sample output:

... At 16:11:27:500 Kinesis got 13 tweets per second
... At 16:11:28:000 Kinesis got 17 tweets per second
... At 16:11:28:500 Kinesis got 8 tweets per second

Step 5. Clean up

  1. Cancel the jobs

    bin/hz-cli cancel tweet-publisher
    bin/hz-cli cancel kinesis-traffic-monitor
  2. Shut down the Hazelcast cluster

    bin/hz-stop
  3. Clean up the Tweets stream in Kinesis, using the AWS Console or the CLI.

Next Steps

Learn more about the Kinesis connector to find out how to override backend parameters like region, endpoint, and security keys.