Mapping to Kafka
To query streams in Kafka topics, you need to create a mapping with the Kafka connector so that the SQL service knows how to access the messages.
What is the Kafka Connector
The Kafka connector acts as a consumer or a producer so that you can create streaming queries that continuously query a given Kafka topic, and also write to a Kafka topic.
Installing the Connector
This connector is included in the full distribution of Hazelcast.
If you’re using the slim distribution, you must add the hazelcast-jet-kafka
module to your member’s classpath.
The Kafka connector is compatible with Kafka version equal to or greater than 1.0.0.
Permissions
Enterprise
If security is enabled, your clients may need permissions to use this connector. For details, see Securing Jobs.
Creating a Kafka Mapping
To create a mapping to a Kafka topic in SQL, you must use the CREATE MAPPING
statement to tell the Kafka connector the following:
-
The name of the topic
-
The address of the Kafka broker
-
How to serialize/deserialize the keys and values in messages
CREATE MAPPING my_topic (1)
TYPE Kafka (2)
OPTIONS (
'bootstrap.servers' = '127.0.0.1:9092' (3)
);
1 | The name of the Kafka topic. |
2 | The name of the connector. |
3 | The address of the Kafka broker. |
Any key/value pairs in the OPTIONS() function that are not recognized by Hazelcast are passed directly to the Kafka consumer/producer.
|
When creating a Kafka mapping, you must tell the Kafka connector how to serialize/deserialize the keys and values in Kafka messages.
To read Kafka messages, the Kafka connector must be able to deserialize them. Similarly, to publish messages to Kafka topics, the Kafka connector must be able to serialize them. You can tell Hazelcast how to serialize/deserialize keys and values by specifying the keyFormat
and valueFormat
fields in the OPTIONS()
function.
The Kafka connector supports serializers for the following formats:
Primitive Messages
For keys and values that are primitives, set the format as the SQL data type that corresponds to the primitive. For example, if the message key is an integer and the value is a
string, use the int
and varchar
formats:
CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
'keyFormat'='int',
'valueFormat'='varchar',
'bootstrap.servers' = '127.0.0.1:9092'
);
The Kafka connector will apply a suitable serializer/deserializer automatically. For example:
'key.serializer' = 'org.apache.kafka.common.serialization.IntegerSerializer',
'key.deserializer' = 'org.apache.kafka.common.serialization.IntegerDeserializer',
'value.serializer' = 'org.apache.kafka.common.serialization.StringSerializer',
'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer'
For a conversion table, see SQL Data Types.
Avro Messages
For keys or values that are in the Avro format, you need to configure different options, depending on whether you have your own Avro schema.
If you don’t already have an Avro schema, map column names to the fields and types that you want to use in your schema. The Kafka connector uses these column names to create its own ad-hoc schema named jet.sql
.
CREATE MAPPING my_topic (
(1)
__key VARCHAR,
ticker VARCHAR,
amount BIGINT,
price DECIMAL
)
TYPE Kafka
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'avro',
'bootstrap.servers' = '127.0.0.1:9092'
);
1 | Column names |
If you already have an Avro schema, you must:
-
Map column names to the fields and types of your schema.
-
Provide the URL of your schema registry in the
schema.registry.url
field.
CREATE MAPPING my_topic (
(1)
__key VARCHAR,
ticker VARCHAR,
amount BIGINT,
price DECIMAL
)
TYPE Kafka
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'avro',
'bootstrap.servers' = '127.0.0.1:9092',
'schema.registry.url' = 'http://127.0.0.1:8081/' (2)
);
1 | Column names |
2 | Schema registry |
Your schema registry will receive entries that contain an ID for the jet.sql
schema.
When you write new Avro objects to the Kafka topic, the Kafka connector uses its own ad-hoc schema named jet.sql
. The Kafka connector creates this schema from the column names that you mapped in the CREATE MAPPING
statement.
SQL Type | Avro Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
All Avro types are a union of the NULL
type and the actual type.
JSON Messages
If values are in the JSON format, configure the valueFormat
field as json
or json-flat
.
CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
'keyFormat' = 'bigint',
'valueFormat' = 'json',
'bootstrap.servers' = '127.0.0.1:9092');
CREATE MAPPING my_topic(
__key BIGINT,
ticker VARCHAR,
amount INT)
TYPE Kafka
OPTIONS (
'keyFormat' = 'bigint',
'valueFormat' = 'json-flat',
'bootstrap.servers' = '127.0.0.1:9092');
Java Messages
Java serialization uses the
Java objects exactly as the KafkaConsumer.poll()
method returns them. You can use
this format for objects serialized using Java serialization or any other
serialization method.
For this format you must also specify the class name using keyJavaClass
and
valueJavaClass
options, for example:
CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
'keyFormat' = 'java',
'keyJavaClass' = 'java.lang.Long',
'valueFormat' = 'java',
'valueJavaClass' = 'com.example.Person',
'value.serializer' = 'com.example.serialization.PersonSerializer',
'value.deserializer' = 'com.example.serialization.PersonDeserializer',
'bootstrap.servers' = '127.0.0.1:9092');
If the Java class corresponds to one of the basic data types (numbers,
dates, strings), that type will be used for the key or value
and mapped as a column named __key
for keys and this
for values. In
the example above, the key will be mapped with the BIGINT
type. In
fact, the above keyFormat
and keyJavaClass
options are equivalent to
'keyFormat'='bigint'
.
If the Java class is not one of the basic types, Hazelcast will analyze
the class using reflection and use its properties as column names. It
recognizes public fields and JavaBean-style getters. If some property
has a non-primitive type, it will be mapped under the SQL OBJECT
type.
Heterogeneous Messages
The Kafka connector supports heterogeneous messages. For example, say you have these messages in your topic:
{"name":"Alice","age":42}
{"name":"Bob","age":43,"petName":"Zaz"}
If you map the column petName
, it will have the value null
for the
1st entry. This scenario is supported. Similar behavior works
with Avro format.