Apache Kafka Connector
The Kafka connector allows you to stream, filter, and transform events between Hazelcast clusters and Kafka.
Apache Kafka is a popular distributed, persistent log store which is a great fit for stream processing systems. Data in Kafka is structured as topics and each topic consists of one or more partitions, stored in the Kafka cluster.
Installing the Connector
This connector is included in the full distribution of Hazelcast.
If you’re using the slim distribution, you must add the
hazelcast-jet-kafka module to your member’s classpath.
If security is enabled, your clients may need permissions to use this connector. For details, see Securing Jobs.
Use these options to configure the Kafka connector.
To read from Kafka as a source, the only requirements are to provide deserializers and a topic name:
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName()); props.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName()); props.setProperty("auto.offset.reset", "earliest"); Pipeline p = Pipeline.create(); p.readFrom(KafkaSources.kafka(props, "topic")) .withNativeTimestamps(0) .writeTo(Sinks.logger());
The topics and partitions are distributed across the Hazelcast cluster, so that each member is responsible for reading a subset of the data.
When used as a sink, then the only requirements are the serializers:
Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("key.serializer", StringSerializer.class.getCanonicalName()); props.setProperty("value.serializer", StringSerializer.class.getCanonicalName()); Pipeline p = Pipeline.create(); p.readFrom(Sources.files("home/logs")) .map(line -> LogParser.parse(line)) .map(log -> entry(log.service(), log.message())) .writeTo(KafkaSinks.kafka(props, "topic"));
One of the most important features of using Kafka as a source is that it’s possible to replay data, which enables fault-tolerance. If the job has a processing guarantee configured, Hazelcast will periodically save the current offsets internally and then replay from the saved offset when the job is restarted. In this mode, Hazelcast will manually track and commit offsets, without interacting with the consumer groups feature of Kafka.
If the processing guarantees are disabled, the source will start reading from
default offsets that are set in the
auto.offset.reset property. You can
enable offset committing by assigning a
group.id, enabling auto offset
enable.auto.commit and configuring
auto.commit.interval.ms in the given properties. Refer to
for the descriptions of these properties.
As a sink, the Kafka connector provides exactly-once guarantees at the cost of using Kafka transactions. Hazelcast commits the produced records after each snapshot is completed. This greatly increases the latency because consumers see the records only after they are committed.
If you use at-least-once guarantee, records are visible immediately, but in the case of a failure some records could be duplicated. You can also configure jobs in exactly-once mode and decrease the guarantee just for a particular Kafka sink.
Kafka is often used together with Confluent Schema Registry
as a repository of types. The use of the schema registry is done through
adding it to the
Properties object and using the
if Avro is being used:
properties.put("value.deserializer", KafkaAvroDeserializer.class); properties.put("specific.avro.reader", true); properties.put("schema.registry.url", schemaRegistryUrl);
Keep in mind that once the record deserialized, Jet still needs to know how to serialize/deserialize the record internally.
The Kafka sink and source are based on version 2.2.0, this means Kafka connector will work with any client and broker having version equal to or greater than 1.0.0.
This connector supports heterogeneous messages. For example, say you have these messages in your topic:
If you map the column
petName, it will have the value
null for the
key=1. This scenario is supported. Similar behavior works
with Avro format.