Amazon Kinesis Connector

Amazon Kinesis Data Streams (KDS) is a massively scalable and durable real-time data streaming service. All data items passing through it, called records, are assigned a partition key. As the name suggests, partition keys group related records together. Records with the same partition key are also ordered. Partition keys are grouped into shards, the base throughput unit of KDS. The input and output rates of shards is limited. Streams can be resharded at any time.

To read from Kinesis, the only requirement is to provide a KDS stream name. (Kinesis does not handle deserialization itself, it only provides serialized binary data.)

Pipeline p = Pipeline.create();
p.readFrom(KinesisSources.kinesis(STREAM).build())
  .withNativeTimestamps(0)
  .writeTo(Sinks.logger());

The shards are distributed across the Hazelcast cluster, so that each node is responsible for reading a subset of the partition keys.

When used as a sink, in order to be able to write out any type of data items, the requirements are: KDS stream name, key function (specifies how to compute the partition key from an input item), and the value function (specifies how to compute the data blob from an input item - the serialization).

FunctionEx<Log, String> keyFn = l -> l.service();
FunctionEx<Log, byte[]> valueFn = l -> l.message().getBytes();
Sink<Log> sink = KinesisSinks.kinesis("stream", keyFn, valueFn).build();

p.readFrom(Sources.files("home/logs")) //read lines of text from log files
 .map(line -> LogParser.parse(line))   //parse lines into Log data objects
 .writeTo(sink);                       //write Log objects out to Kinesis

To use the Kinesis connectors, make sure the hazelcast-jet-kinesis module is present in the lib directory and add the following dependency to your application:

  • Gradle

  • Maven

compile 'com.hazelcast.jet:hazelcast-jet-kinesis:5.5.1'
<dependency>
  <groupId>com.hazelcast.jet</groupId>
  <artifactId>hazelcast-jet-kinesis</artifactId>
  <version>5.5.1</version>
</dependency>

Fault-tolerance

Amazon Kinesis persists the data and it’s possible to replay it (on a per-shard basis). This enables fault tolerance. If a job has a processing guarantee configured, then Hazelcast will periodically save the current shard offsets and then replay from the saved offsets when the job is restarted. If no processing guarantee is enabled, the source will start reading from the oldest available data, determined by the KDS retention period (defaults to 24 hours, can be as long as 365 days).

While the source is suitable for both at-least-once and exactly-once pipelines, the only processing guarantee the sink can support is at-least-once. This is caused by the lack of transaction support in Kinesis (can’t write data into it with transactional guarantees) and the AWS SDK occasionally causing data duplication on its own (see Producer Retries in the documentation).

Ordering

As stated before, Kinesis preserves the order of records with the same partition key (or, more generally, the order of records belonging to the same shard). However, neither the source nor the sink can fully uphold this guarantee.

The problem scenario for the source is resharding. Resharding is the process of adjusting the number of shards of a stream to adapt to data flow rate changes. It is done voluntarily and explicitly by the stream’s owner, and it does not interrupt the flow of data through the stream. During resharding, some (old) shards get closed, and new ones are created - some partition keys transition from an old shard to a new one. To keep the ordering for such a partition key in transit, Hazelcast would need to make sure that it finishes reading all the data from the old shard before starting to read data from the new one. Hazelcast would also need to ensure that the new shard’s data can’t possibly overtake the old ones data inside the pipeline. Currently, Hazelcast does not have a mechanism to ensure this for such a distributed source. It’s best to schedule resharding when there are lulls in the data flow. Watermarks might also manifest unexpected behaviour, if data is flowing during resharding.

The problem scenario for the sink is the ingestion data rate of a shard being tripped. A KDS shard has an ingestion rate of 1MiB per second. If you try to write more into it, then some records will be rejected. This rejection breaks the ordering because the sinks write data in batches, and the shards don’t just reject entire batches, but random items from them. What’s rejected can (and is) retried, but the batch’s original ordering can’t be preserved. The sink can’t entirely avoid all rejections because it’s distributed, multiple instances of it write into the same shard, and coordinating an aggregated rate among them is not something currently possible in Hazelcast and there can be also others sending to the same stream. Truth be told, though, Kinesis also only preserves the order of successfully ingested records, not the order in which ingestion was attempted. Having enough shards and properly spreading out partition keys should prevent the problem from happening.