This is a prerelease version.

View latest

Mapping to Kafka

To query streams in Kafka topics, you need to create a mapping with the Kafka connector so that the SQL service knows how to access the messages.

What is the Kafka Connector

The Kafka connector acts as a consumer or a producer so that you can create streaming queries that continuously query a given Kafka topic, and also write to a Kafka topic.

Installing the Connector

This connector is included in the full distribution of Hazelcast.

If you’re using the slim distribution, you must add the hazelcast-jet-kafka module to your member’s classpath.

The Kafka connector is compatible with Kafka version equal to or greater than 1.0.0.

Permissions

Enterprise Edition

If security is enabled, your clients may need permissions to use this connector. For details, see Securing Jobs.

Creating a Kafka Mapping

To create a mapping to a Kafka topic in SQL, you must use the CREATE MAPPING statement to tell the Kafka connector the following:

  • The name of the topic

  • The address of the Kafka broker

  • Client authentication details

  • How to serialize/deserialize the keys and values in messages

  • The preferred number of parallel consumer processors on each member for input mappings

CREATE MAPPING my_topic (1)
TYPE Kafka (2)
OPTIONS (
    'bootstrap.servers' = '127.0.0.1:9092', (3)
    'security.protocol' = 'SASL_SSL',
    'sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule
    required username="<my_api_key>"
    password="<my_api_secret>";',
    'sasl.mechanism' = 'PLAIN', (4)
    'preferredLocalParallelism' = '2' (5)
);
1 The name of the Kafka topic.
2 The name of the connector.
3 The address of the Kafka broker.
4 The SASL configuration used to authenticate client connections, where the API key provides the username and the API secret key provides the password. Note that the clause must end with a semi-colon, as shown in the example above.
5 The preferred number of parallel consumer processors on each member for input mappings. If not specified, the default is 4. For output mappings, the parallelism is always 1 and this option is ignored.
Any key/value pairs in the OPTIONS() function that are not recognized by Hazelcast are passed directly to the Kafka consumer/producer.

Each member of the Hazelcast cluster has a processor running a Kafka connector for the mapping. For input mappings, preferred local parallelism is the number of parallel consumer processors you would prefer on each member. For balanced distributed processing, ensure that the local parallelism multiplied by the number of members in the cluster is a divisor of the total number of partitions in the input Kafka topic. If this is not done, some processors will be assigned more input Kafka topic partitions than others. Where there are more processors in the Hazelcast cluster than partitions in the input Kafka topic, the additional processors will be idle and waste resources.

When creating a Kafka mapping, you must tell the Kafka connector how to serialize/deserialize the keys and values in Kafka messages. To read Kafka messages, the Kafka connector must be able to deserialize them. Similarly, to publish messages to Kafka topics, the Kafka connector must be able to serialize them. You can tell Hazelcast how to serialize/deserialize keys and values by specifying the keyFormat and valueFormat fields in the OPTIONS() function.

The Kafka connector supports serializers for the following formats:

Primitive Messages

For keys and values that are primitives, set the format as the SQL data type that corresponds to the primitive. For example, if the message key is an integer and the value is a string, use the int and varchar formats:

CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
    'keyFormat'='int',
    'valueFormat'='varchar',
    'bootstrap.servers' = '127.0.0.1:9092'
);

The Kafka connector will apply a suitable serializer/deserializer automatically. For example:

'key.serializer' = 'org.apache.kafka.common.serialization.IntegerSerializer',
'key.deserializer' = 'org.apache.kafka.common.serialization.IntegerDeserializer',
'value.serializer' = 'org.apache.kafka.common.serialization.StringSerializer',
'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer'

For a conversion table, see SQL Data Types.

Avro Messages

For keys or values that are in the Avro format, you need to configure different options, depending on whether you have your own Avro schema.

If you don’t already have an Avro schema, map column names to the fields and types that you want to use in your schema. The Kafka connector uses these column names to create its own ad-hoc schema named jet.sql.

CREATE MAPPING my_topic (
    (1)
    __key VARCHAR,
    ticker VARCHAR,
    amount BIGINT,
    price DECIMAL
)
TYPE Kafka
OPTIONS (
    'keyFormat' = 'varchar',
    'valueFormat' = 'avro',
    'bootstrap.servers' = '127.0.0.1:9092'
);
1 Column names

If you already have an Avro schema, you must:

  • Map column names to the fields and types of your schema.

  • Provide the URL of your schema registry in the schema.registry.url field.

CREATE MAPPING my_topic (
    (1)
    __key VARCHAR,
    ticker VARCHAR,
    amount BIGINT,
    price DECIMAL
)
TYPE Kafka
OPTIONS (
    'keyFormat' = 'varchar',
    'valueFormat' = 'avro',
    'bootstrap.servers' = '127.0.0.1:9092',
    'schema.registry.url' = 'http://127.0.0.1:8081/' (2)
);
1 Column names
2 Schema registry

Your schema registry will receive entries that contain an ID for the jet.sql schema. When you write new Avro objects to the Kafka topic, the Kafka connector uses its own ad-hoc schema named jet.sql. The Kafka connector creates this schema from the column names that you mapped in the CREATE MAPPING statement.

Table 1. Avro type conversion
SQL Type Avro Type

TINYINT, SMALLINT, INT

INT

BIGINT

LONG

REAL

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

VARCHAR and all other types

STRING

All Avro types are a union of the NULL type and the actual type.

JSON Messages

If values are in the JSON format, configure the valueFormat field as json or json-flat.

JSON
CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
    'keyFormat' = 'bigint',
    'valueFormat' = 'json',
    'bootstrap.servers' = '127.0.0.1:9092');
JSON-FLAT
CREATE MAPPING my_topic(
    __key BIGINT,
    ticker VARCHAR,
    amount INT)
TYPE Kafka
OPTIONS (
    'keyFormat' = 'bigint',
    'valueFormat' = 'json-flat',
    'bootstrap.servers' = '127.0.0.1:9092');

Java Messages

Java serialization uses the Java objects exactly as the KafkaConsumer.poll() method returns them. You can use this format for objects serialized using Java serialization or any other serialization method.

For this format you must also specify the class name using keyJavaClass and valueJavaClass options, for example:

CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
    'keyFormat' = 'java',
    'keyJavaClass' = 'java.lang.Long',
    'valueFormat' = 'java',
    'valueJavaClass' = 'com.example.Person',
    'value.serializer' = 'com.example.serialization.PersonSerializer',
    'value.deserializer' = 'com.example.serialization.PersonDeserializer',
    'bootstrap.servers' = '127.0.0.1:9092');

If the Java class corresponds to one of the basic data types (numbers, dates, strings), that type will be used for the key or value and mapped as a column named __key for keys and this for values. In the example above, the key will be mapped with the BIGINT type. In fact, the above keyFormat and keyJavaClass options are equivalent to 'keyFormat'='bigint'.

If the Java class is not one of the basic types, Hazelcast will analyze the class using reflection and use its properties as column names. It recognizes public fields and JavaBean-style getters. If some property has a non-primitive type, it will be mapped under the SQL OBJECT type.

Mapping Column Names

For json-flat and avro formats, you must specify the columns names in the mapping.

For keys, the format of the external name must be either __key.<name> for a field in the key or this.<name> for a field in the value.

The column name defaults to this.<columnName>.

Heterogeneous Messages

The Kafka connector supports heterogeneous messages. For example, say you have these messages in your topic:

{"name":"Alice","age":42}
{"name":"Bob","age":43,"petName":"Zaz"}

If you map the column petName, it will have the value null for the 1st entry. This scenario is supported. Similar behavior works with Avro format.