Mapping to Kafka

Before you can query streaming messages in Kafka topics, you need to create a mapping with the Kafka connector so that the SQL service knows how to access messages in the most efficient way.

What is the Kafka Connector

The Kafka connector acts as a consumer or a producer so that you can create streaming queries that continuously query a given Kafka topic.

Installing the Connector

This connector is included in the full distribution of Hazelcast.

If you’re using the slim distribution, you must add the hazelcast-jet-kafka module to your member’s classpath.

The Kafka connector is compatible with Kafka version equal to or greater than 1.0.0.

Kafka Security

If you use Hazelcast Enterprise, your clients may need permissions to use this connector. For details, see Securing Jobs.

Creating a Kafka Mapping

To create a mapping to a Kafka topic in SQL, you must use the CREATE MAPPING statement to tell the Kafka connector the following:

  • The name of the topic

  • The address of the Kafka broker

  • How to serialize/deserialize the keys and values in messages

CREATE MAPPING my_topic (1)
TYPE Kafka (2)
OPTIONS (
    'bootstrap.servers' = '127.0.0.1:9092' (3)
);
1 The name of the Kafka topic.
2 The name of the connector.
3 The address of the Kafka broker.
Any key/value pairs in the OPTIONS() function that are not recognized by Hazelcast are passed directly to the Kafka broker.

When creating a Kafka mapping, you must tell the Kafka connector how to serialize/deserialize the keys and values in Kafka messages. To read Kafka messages, the Kafka connector must be able to deserialize them. Similiarly, to publish messages to Kafka topics, the Kafka connector must be able to serialize them. You can tell Hazelcast how to serialize/deserialize keys and values by specifying the keyFormat and valueFormat fields in the OPTIONS() function.

The Kafka connector supports serializers for the following formats:

Primitive Messages

For keys and values that are primitives, set the format as the SQL data type that corresponds to the primitive. For example, if the message key is an integer and the value is a string, use the int and varchar formats:

CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
    'keyFormat'='int',
    'valueFormat'='varchar',
    'bootstrap.servers' = '127.0.0.1:9092'
);

The Kafka connector will apply a suitable serializer/deserializer automatically. For example:

'key.serializer' = 'org.apache.kafka.common.serialization.IntegerSerializer',
'key.deserializer' = 'org.apache.kafka.common.serialization.IntegerDeserializer',
'value.serializer' = 'org.apache.kafka.common.serialization.StringSerializer',
'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer'

For a conversion table, see SQL Data Types.

Avro Messages

For keys or values that are in the Avro format, you need to configure different options, depending on whether you have your own Avro schema.

If you don’t already have an Avro schema, map column names to the fields and types that you want to use in your schema. The Kafka connector uses these column names to create its own ad-hoc schema named jet.sql.

CREATE MAPPING my_topic (
    (1)
    __key VARCHAR,
    ticker VARCHAR,
    amount BIGINT,
    price DECIMAL
)
TYPE Kafka
OPTIONS (
    'keyFormat' = 'varchar',
    'valueFormat' = 'avro',
    'bootstrap.servers' = '127.0.0.1:9092'
);
1 Column names

If you already have an Avro schema, you must:

  • Map column names to the fields and types of your schema.

  • Provide the URL of your schema registry in the schema.registry.url field.

CREATE MAPPING my_topic (
    (1)
    __key VARCHAR,
    ticker VARCHAR,
    amount BIGINT,
    price DECIMAL
)
TYPE Kafka
OPTIONS (
    'keyFormat' = 'varchar',
    'valueFormat' = 'avro',
    'bootstrap.servers' = '127.0.0.1:9092',
    'schema.registry.url' = 'http://127.0.0.1:8081/' (2)
);
1 Column names
2 Schema registry

Your schema registry will receive entries that contain an ID for the jet.sql schema. When you write new Avro objects to the Kafka topic, the Kafka connector uses its own ad-hoc schema named jet.sql. The Kafka connector creates this schema from the column names that you mapped in the CREATE MAPPING statement.

Table 1. Avro type conversion
SQL Type Avro Type

TINYINT, SMALLINT, INT

INT

BIGINT

LONG

REAL

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

VARCHAR and all other types

STRING

All Avro types are a union of the NULL type and the actual type.

JSON Messages

If values are in the JSON format, configure the valueFormat field as the json-flat type, and map column names to the JSON keys.

CREATE MAPPING my_topic(
    (1)
    __key BIGINT,
    ticker VARCHAR,
    amount INT)
TYPE Kafka
OPTIONS (
    'keyFormat' = 'bigint',
    'valueFormat' = 'json-flat',
    'bootstrap.servers' = '127.0.0.1:9092');
1 Column list

There are no additional options for this format.

JSON’s type system doesn’t match SQL’s exactly. For example, JSON numbers have unlimited precision, but such numbers are typically not portable. We convert SQL integer and floating-point types into JSON numbers. We convert the DECIMAL type, as well as all temporal types, to JSON strings.

We don’t support the JSON type from the SQL standard yet. That means you can’t use functions like JSON_VALUE or JSON_QUERY. If your JSON documents don’t all have the same fields or if they contain nested objects, the usability is limited.

Java Messages

Java serialization uses the Java objects exactly as the KafkaConsumer.poll() method returns them. You can use this format for objects serialized using Java serialization or any other serialization method.

For this format you must also specify the class name using keyJavaClass and valueJavaClass options, for example:

CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
    'keyFormat' = 'java',
    'keyJavaClass' = 'java.lang.Long',
    'valueFormat' = 'java',
    'valueJavaClass' = 'com.example.Person',
    'value.serializer' = 'com.example.serialization.PersonSerializer',
    'value.deserializer' = 'com.example.serialization.PersonDeserializer',
    'bootstrap.servers' = '127.0.0.1:9092');

If the Java class corresponds to one of the basic data types (numbers, dates, strings), that type will be used for the key or value and mapped as a column named __key for keys and this for values. In the example above, the key will be mapped with the BIGINT type. In fact, the above keyFormat and keyJavaClass options are equivalent to 'keyFormat'='bigint'.

If the Java class is not one of the basic types, Hazelcast will analyze the class using reflection and use its properties as column names. It recognizes public fields and JavaBean-style getters. If some property has a non-primitive type, it will be mapped under the SQL OBJECT type.

Mapping Column Names

For JSON and Avro formats, you must specify the columns names in the mapping.

For keys, the format of the column name must be either __key.<name> for a field in the key or this.<name> for a field in the value.

The column name defaults to this.<columnName>.

Heterogeneous Messages

The Kafka connector supports heterogeneous messages. For example, say you have these messages in your topic:

{"name":"Alice","age":42}
{"name":"Bob","age":43,"petName":"Zaz"}

If you map the column petName, it will have the value null for the entry with key=1. This scenario is supported. Similar behavior works with Avro format.