A newer version of Hazelcast Platform is available.

View latest

Apache Kafka Connector

The Kafka connector allows you to stream, filter, and transform events between Hazelcast clusters and Kafka.

Apache Kafka is a popular distributed, persistent log store which is a great fit for stream processing systems. Data in Kafka is structured as topics and each topic consists of one or more partitions, stored in the Kafka cluster.

Installing the Connector

This connector is included in the full distribution of Hazelcast.

If you’re using the slim distribution, you must add the hazelcast-jet-kafka module to your member’s classpath.

Permissions

Enterprise

If security is enabled, your clients may need permissions to use this connector. For details, see Securing Jobs.

Configuration Options

Use these options to configure the Kafka connector.

To read from Kafka as a source, the only requirements are to provide deserializers and a topic name:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("auto.offset.reset", "earliest");

Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.kafka(props, "topic"))
 .withNativeTimestamps(0)
 .writeTo(Sinks.logger());

The topics and partitions are distributed across the Hazelcast cluster, so that each member is responsible for reading a subset of the data.

When used as a sink, then the only requirements are the serializers:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
props.setProperty("value.serializer", StringSerializer.class.getCanonicalName());

Pipeline p = Pipeline.create();
p.readFrom(Sources.files("home/logs"))
 .map(line -> LogParser.parse(line))
 .map(log -> entry(log.service(), log.message()))
 .writeTo(KafkaSinks.kafka(props, "topic"));

Fault Tolerance

One of the most important features of using Kafka as a source is that it’s possible to replay data, which enables fault-tolerance. If the job has a processing guarantee configured, Hazelcast will periodically save the current offsets internally and then replay from the saved offset when the job is restarted. In this mode, Hazelcast will manually track and commit offsets, without interacting with the consumer groups feature of Kafka.

If the processing guarantees are disabled, the source will start reading from default offsets that are set in the auto.offset.reset property. You can enable offset committing by assigning a group.id, enabling auto offset committing using enable.auto.commit and configuring auto.commit.interval.ms in the given properties. Refer to Kafka documentation for the descriptions of these properties.

You can also explicitly specify exact initial offsets for the Kafka source using TopicsConfig parameter. Note that initial offsets provided in topicConfig will always have priority over offsets stored in Kafka or associated with a given consumer group. Those offsets are used only when the job is started for the first time after submission. Afterwards, the regular fault tolerance mechanism described above is used. This option is not supported when processing guarantees are disabled.

Transactional Guarantees

As a sink, the Kafka connector provides exactly-once guarantees at the cost of using Kafka transactions. Hazelcast commits the produced records after each snapshot is completed. This greatly increases the latency because consumers see the records only after they are committed.

If you use at-least-once guarantee, records are visible immediately, but in the case of a failure some records could be duplicated. You can also configure jobs in exactly-once mode and decrease the guarantee just for a particular Kafka sink.

Schema Registry

Kafka is often used together with Confluent Schema Registry as a repository of types. The use of the schema registry is done through adding it to the Properties object and using the KafkaAvroSerializer/Deserializer if Avro is being used:

properties.put("value.deserializer", KafkaAvroDeserializer.class);
properties.put("specific.avro.reader", true);
properties.put("schema.registry.url", schemaRegistryUrl);

Keep in mind that once the record deserialized, Jet still needs to know how to serialize/deserialize the record internally.

Explicit Avro Schema

To use an Avro schema for GenericRecord, you must provide the following as parameters for the Kafka source and/or sink:

  • JSON representation of Avro schema

  • A HazelcastKafkaAvroSerializer as key/value serializer class

  • A HazelcastKafkaAvroDeserializer as key/value deserializer class

The process will utilize the schema from properties, disregarding the schema from the record. An exception will be raised if the types of the record schema and properties schema are different.

The parameters can be specified as shown in the following examples.

Example source parameters:

properties.put("key.deserializer", HazelcastKafkaAvroDeserializer.class);
properties.put("value.deserializer", HazelcastKafkaAvroDeserializer.class);
properties.put("keyAvroSchema", "{\"type\":\"record\",\"name\":\"key\",\"namespace\":\"schema\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
properties.put("valueAvroSchema", "{\"type\":\"record\",\"name\":\"value\",\"namespace\":\"schema\",\"fields\":[{\"name\":\"value\",\"type\":[\"null\",\"string\"],\"default\":null}]}");

Example sink parameters:

properties.put("key.serializer", HazelcastKafkaAvroSerializer.class);
properties.put("value.serializer", HazelcastKafkaAvroSerializer.class);
properties.put("keyAvroSchema", "{\"type\":\"record\",\"name\":\"key\",\"namespace\":\"schema\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
properties.put("valueAvroSchema", "{\"type\":\"record\",\"name\":\"value\",\"namespace\":\"schema\",\"fields\":[{\"name\":\"value\",\"type\":[\"null\",\"string\"],\"default\":null}]}");

Keep in mind that you can use different serializer/deserializer for key and value.

Version Compatibility

The Kafka sink and source are based on version 2.2.0, this means Kafka connector will work with any client and broker having version equal to or greater than 1.0.0.

Heterogeneous Messages

This connector supports heterogeneous messages. For example, say you have these messages in your topic:

{"name":"Alice","age":42}
{"name":"Bob","age":43,"petName":"Zaz"}

If you map the column petName, it will have the value null for the entry with key=1. This scenario is supported. Similar behavior works with Avro format.