This is a prerelease version.

Apache Kafka SQL Connector

The Apache Kafka connector supports batch and stream processing by reading from and writing to Apache Kafka topics.

Apache Kafka is schema-less, however SQL assumes a schema. We assume all messages in a topic are of the same type (with some exceptions). Kafka also supports several serialization options.

Serialization Options

To work with Kafka, you must specify the keyFormat and valueFormat options. Currently, even if you create a table mapping explicitly, we can’t resolve these. These are the supported values for keyFormat and valueFormat:

The key and value format can be different. Any options not recognized by Hazelcast are passed directly to the Kafka producer or consumer. See the examples for individual serialization options below.

Primitive serialization

If the format is one of the primitive types, then the Java class representing that type will be stored in the map. By primitive type we mean any supported SQL data type, except OBJECT.

For example, if the topic key is an Integer and the topic message is a String, use:

CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
    'keyFormat'='int',
    'valueFormat'='varchar',
    'bootstrap.servers' = '127.0.0.1:9092'
)

Note that you don’t have to provide key.serializer, key.deserializer, value.serializer and value.deserializer kafka options, which are otherwise required. In this case, the following will be automatically added:

'key.serializer' = 'org.apache.kafka.common.serialization.IntegerSerializer',
'key.deserializer' = 'org.apache.kafka.common.serialization.IntegerDeserializer',
'value.serializer' = 'org.apache.kafka.common.serialization.StringSerializer',
'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer'

Avro Serialization

When using Avro, Hazelcast reads the fields from the GenericRecord returned by the KafkaAvroDeserializer. When inserting to a topic, we create an ad-hoc Avro schema named jet.sql from the mapping columns. Because of this, the column list is required. Hazelcast currently can’t use your custom Avro schema to create objects, but it can use it to read objects written through our ad-hoc schema, as long as the field names and types match.

Mapping Between SQL and Avro Types

SQL Type Avro Type

TINYINT, SMALLINT, INT

INT

BIGINT

LONG

REAL

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

VARCHAR and all other types

STRING

All Avro types are a union of the NULL type and the actual type.

CREATE MAPPING my_topic (
    __key VARCHAR,
    ticker VARCHAR,
    amount BIGINT,
    price DECIMAL
)
TYPE Kafka
OPTIONS (
    'keyFormat' = 'varchar',
    'valueFormat' = 'avro',
    'bootstrap.servers' = '127.0.0.1:9092',
    'schema.registry.url' = 'http://127.0.0.1:8081/'
    /* more Kafka options ... */
)

In this example, the key is a plain Long number, the value is avro-serialized. keyFormat and valueFormat options are handled by Hazelcast, the rest is passed directly to Kafka producer or consumer.

We omitted the kafka serialization options, in this case io.confluent.kafka.serializers.KafkaAvroSerializer and io.confluent.kafka.serializers.KafkaAvroDeserializer are automatically added.

JSON Serialization

The value will be stored as a JSON object. Hazelcast can’t automatically determine the column list for this format, you must explicitly specify it:

CREATE MAPPING my_topic(
    __key BIGINT,
    ticker VARCHAR,
    amount INT)
TYPE Kafka
OPTIONS (
    'keyFormat' = 'bigint',
    'valueFormat' = 'json',
    'bootstrap.servers' = '127.0.0.1:9092')

There are no additional options for this format.

JSON’s type system doesn’t match SQL’s exactly. For example, JSON numbers have unlimited precision, but such numbers are typically not portable. We convert SQL integer and floating-point types into JSON numbers. We convert the DECIMAL type, as well as all temporal types, to JSON strings.

We don’t support the JSON type from the SQL standard yet. That means you can’t use functions like JSON_VALUE or JSON_QUERY. If your JSON documents don’t all have the same fields or if they contain nested objects, the usability is limited.

Java Serialization

Java serialization is the last-resort serialization option. It uses the Java objects exactly as KafkaConsumer.poll() returns them. You can use it for objects serialized using the Java serialization or any other serialization method.

For this format you must specify the class name using keyJavaClass and valueJavaClass options, for example:

CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
    'keyFormat' = 'java',
    'keyJavaClass' = 'java.lang.Long',
    'valueFormat' = 'java',
    'valueJavaClass' = 'com.example.Person',
    'value.serializer' = 'com.example.serialization.PersonSerializer',
    'value.deserializer' = 'com.example.serialization.PersonDeserializer',
    'bootstrap.servers' = '127.0.0.1:9092')

If the Java class corresponds to one of the basic data types (numbers, dates, strings), that type will directly be used for the key or value and mapped as a column named __key for keys and this for values. In the example above, the key will be mapped with the BIGINT type. In fact, the above keyFormat & keyJavaClass duo is equivalent to 'keyFormat'='bigint'.

If the Java class is not one of the basic types, Hazelcast will analyze the class using reflection and use its properties as column names. It recognizes public fields and JavaBean-style getters. If some property has a non-primitive type, it will be mapped under the OBJECT type.

External Column Name

You rarely need to specify the columns in DDL. If you do, you might need to specify the external name for the column.

The entries in a map naturally have key and value elements. Because of this, the format of the external name must be either __key.<name> for a field in the key or this.<name> for a field in the value.

The external name defaults to this.<columnName>, so normally you only need to specify it for key fields. There are also columns that represent the entire key and value objects, called __key and this.

Heterogeneous Messages

For example, let’s say you have these messages in your topic:

{"name":"Alice","age":42}
{"name":"Bob","age":43,"petName":"Zaz"}

If you map the column petName, it will have the value null for the entry with key=1. This scenario is supported. Similar behavior works with Avro format.

Installation

You need the hazelcast-jet-kafka module on your classpath. For Gradle or Maven, make sure to add the dependency:

  • Gradle

  • Maven

compile 'com.hazelcast.jet:hazelcast-jet-kafka:5.0-BETA-1'
<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-jet-kafka</artifactId>
    <version>5.0-BETA-1</version>
</dependency>

If you’re using the distribution package make sure use the full one, because the hazelcast-jet-kafka-5.0-BETA-1.jar you need is not contained in the slim one.

The hazelcast-jet-kafka module is not supported on Solaris operating systems.