Apache Kafka Connector
The Kafka connector allows you to stream, filter, and transform events between Hazelcast clusters and Kafka.
Apache Kafka is a popular distributed, persistent log store which is a great fit for stream processing systems. Data in Kafka is structured as topics and each topic consists of one or more partitions, stored in the Kafka cluster.
Installing the Connector
This connector is included in the full distribution of Hazelcast.
If you’re using the slim distribution, you must add the hazelcast-jet-kafka
module to your member’s classpath.
Permissions
Enterprise Edition
If security is enabled, your clients may need permissions to use this connector. For details, see Securing Jobs.
Configuration Options
Use these options to configure the Kafka connector.
To read from Kafka as a source, the only requirements are to provide deserializers and a topic name:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
props.setProperty("auto.offset.reset", "earliest");
Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.kafka(props, "topic"))
.withNativeTimestamps(0)
.writeTo(Sinks.logger());
The topics and partitions are distributed across the Hazelcast cluster, so that each member is responsible for reading a subset of the data.
When used as a sink, then the only requirements are the serializers:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.serializer", StringSerializer.class.getCanonicalName());
props.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
Pipeline p = Pipeline.create();
p.readFrom(Sources.files("home/logs"))
.map(line -> LogParser.parse(line))
.map(log -> entry(log.service(), log.message()))
.writeTo(KafkaSinks.kafka(props, "topic"));
Fault Tolerance
One of the most important features of using Kafka as a source is that it’s possible to replay data, which enables fault-tolerance. If the job has a processing guarantee configured, Hazelcast will periodically save the current offsets internally and then replay from the saved offset when the job is restarted. In this mode, Hazelcast will manually track and commit offsets, without interacting with the consumer groups feature of Kafka.
If the processing guarantees are disabled, the source will start reading from
default offsets that are set in the auto.offset.reset
property. You can
enable offset committing by assigning a group.id
, enabling auto offset
committing using enable.auto.commit
and configuring
auto.commit.interval.ms
in the given properties. Refer to
Kafka documentation
for the descriptions of these properties.
You can also explicitly specify exact initial offsets for the Kafka source using TopicsConfig
parameter.
Note that initial offsets provided in topicConfig
will always have priority over offsets stored in Kafka or associated with a given consumer group.
Those offsets are used only when the job is started for the first time after submission.
Afterwards, the regular fault tolerance mechanism described above is used.
This option is not supported when processing guarantees are disabled.
Transactional Guarantees
As a sink, the Kafka connector provides exactly-once guarantees at the cost of using Kafka transactions. Hazelcast commits the produced records after each snapshot is completed. This greatly increases the latency because consumers see the records only after they are committed.
If you use at-least-once guarantee, records are visible immediately, but in the case of a failure some records could be duplicated. You can also configure jobs in exactly-once mode and decrease the guarantee just for a particular Kafka sink.
Schema Registry
Kafka is often used together with Confluent Schema Registry
as a repository of types. The use of the schema registry is done through
adding it to the Properties
object and using the KafkaAvroSerializer/Deserializer
if Avro is being used:
properties.put("value.deserializer", KafkaAvroDeserializer.class);
properties.put("specific.avro.reader", true);
properties.put("schema.registry.url", schemaRegistryUrl);
Keep in mind that once the record deserialized, Jet still needs to know how to serialize/deserialize the record internally.
Explicit Avro Schema
To use an Avro schema for GenericRecord
, you must provide the following as parameters for the Kafka source and/or sink:
-
JSON representation of Avro schema
-
A
HazelcastKafkaAvroSerializer
as key/value serializer class -
A
HazelcastKafkaAvroDeserializer
as key/value deserializer class
The process will utilize the schema from properties, disregarding the schema from the record. An exception will be raised if the types of the record schema and properties schema are different.
The parameters can be specified as shown in the following examples.
Example source parameters:
properties.put("key.deserializer", HazelcastKafkaAvroDeserializer.class);
properties.put("value.deserializer", HazelcastKafkaAvroDeserializer.class);
properties.put("keyAvroSchema", "{\"type\":\"record\",\"name\":\"key\",\"namespace\":\"schema\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
properties.put("valueAvroSchema", "{\"type\":\"record\",\"name\":\"value\",\"namespace\":\"schema\",\"fields\":[{\"name\":\"value\",\"type\":[\"null\",\"string\"],\"default\":null}]}");
Example sink parameters:
properties.put("key.serializer", HazelcastKafkaAvroSerializer.class);
properties.put("value.serializer", HazelcastKafkaAvroSerializer.class);
properties.put("keyAvroSchema", "{\"type\":\"record\",\"name\":\"key\",\"namespace\":\"schema\",\"fields\":[{\"name\":\"key\",\"type\":[\"null\",\"int\"],\"default\":null}]}");
properties.put("valueAvroSchema", "{\"type\":\"record\",\"name\":\"value\",\"namespace\":\"schema\",\"fields\":[{\"name\":\"value\",\"type\":[\"null\",\"string\"],\"default\":null}]}");
Keep in mind that you can use different serializer/deserializer for key and value.
Version Compatibility
The Kafka sink and source are based on version 2.2.0, this means Kafka connector will work with any client and broker having version equal to or greater than 1.0.0.
Heterogeneous Messages
This connector supports heterogeneous messages. For example, say you have these messages in your topic:
{"name":"Alice","age":42}
{"name":"Bob","age":43,"petName":"Zaz"}
If you map the column petName
, it will have the value null
for the
entry with key=1
. This scenario is supported. Similar behavior works
with Avro format.