Mapping to Kafka
To query streams in Kafka topics, you need to create a mapping with the Kafka connector so that the SQL service knows how to access the messages.
What is the Kafka Connector
The Kafka connector acts as a consumer or a producer so that you can create streaming queries that continuously query a given Kafka topic, and also write to a Kafka topic.
Installing the Connector
This connector is included in the full distribution of Hazelcast.
If you’re using the slim distribution, you must add the hazelcast-jet-kafka
module to your member’s classpath.
The Kafka connector is compatible with Kafka version equal to or greater than 1.0.0.
Permissions
Enterprise
If security is enabled, your clients may need permissions to use this connector. For details, see Securing Jobs.
Creating a Kafka Mapping
To create a mapping to a Kafka topic in SQL, you must use the CREATE MAPPING
statement to tell the Kafka connector the following:
-
The name of the topic
-
The address of the Kafka broker
-
Client authentication details
-
How to serialize/deserialize the keys and values in messages
-
The preferred number of parallel consumer processors on each member for input mappings
CREATE MAPPING my_topic (1)
TYPE Kafka (2)
OPTIONS (
'bootstrap.servers' = '127.0.0.1:9092', (3)
'security.protocol' = 'SASL_SSL',
'sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule
required username="<my_api_key>"
password="<my_api_secret>";',
'sasl.mechanism' = 'PLAIN', (4)
'preferredLocalParallelism' = '2' (5)
);
1 | The name of the Kafka topic. |
2 | The name of the connector. |
3 | The address of the Kafka broker. |
4 | The SASL configuration used to authenticate client connections, where the API key provides the username and the API secret key provides the password. Note that the clause must end with a semi-colon, as shown in the example above. |
5 | The preferred number of parallel consumer processors on each member for input mappings. If not specified, the default is 4 . For output mappings, the parallelism is always 1 and this option is ignored. |
Any key/value pairs in the OPTIONS() function that are not recognized by Hazelcast are passed directly to the Kafka consumer/producer.
|
Each member of the Hazelcast cluster has a processor running a Kafka connector for the mapping. For input mappings, preferred local parallelism is the number of parallel consumer processors you would prefer on each member. For balanced distributed processing, ensure that the local parallelism multiplied by the number of members in the cluster is a divisor of the total number of partitions in the input Kafka topic. If this is not done, some processors will be assigned more input Kafka topic partitions than others. Where there are more processors in the Hazelcast cluster than partitions in the input Kafka topic, the additional processors will be idle and waste resources.
When creating a Kafka mapping, you must tell the Kafka connector how to serialize/deserialize the keys and values in Kafka messages.
To read Kafka messages, the Kafka connector must be able to deserialize them. Similarly, to publish messages to Kafka topics, the Kafka connector must be able to serialize them. You can tell Hazelcast how to serialize/deserialize keys and values by specifying the keyFormat
and valueFormat
fields in the OPTIONS()
function.
The Kafka connector supports serializers for the following formats:
Primitive Messages
For keys and values that are primitives, set the format as the SQL data type that corresponds to the primitive. For example, if the message key is an integer and the value is a
string, use the int
and varchar
formats:
CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
'keyFormat'='int',
'valueFormat'='varchar',
'bootstrap.servers' = '127.0.0.1:9092'
);
The Kafka connector will apply a suitable serializer/deserializer automatically. For example:
'key.serializer' = 'org.apache.kafka.common.serialization.IntegerSerializer',
'key.deserializer' = 'org.apache.kafka.common.serialization.IntegerDeserializer',
'value.serializer' = 'org.apache.kafka.common.serialization.StringSerializer',
'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer'
For a conversion table, see SQL Data Types.
Avro Messages
For keys or values that are in the Avro format, you need to configure different options, depending on whether you have your own Avro schema.
If you don’t already have an Avro schema, map column names to the fields and types that you want to use in your schema. The Kafka connector uses these column names to create its own ad-hoc schema named jet.sql
.
CREATE MAPPING my_topic (
(1)
__key VARCHAR,
ticker VARCHAR,
amount BIGINT,
price DECIMAL
)
TYPE Kafka
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'avro',
'bootstrap.servers' = '127.0.0.1:9092'
);
1 | Column names |
If you already have an Avro schema, you must:
-
Map column names to the fields and types of your schema.
-
Provide the URL of your schema registry in the
schema.registry.url
field.
CREATE MAPPING my_topic (
(1)
__key VARCHAR,
ticker VARCHAR,
amount BIGINT,
price DECIMAL
)
TYPE Kafka
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'avro',
'bootstrap.servers' = '127.0.0.1:9092',
'schema.registry.url' = 'http://127.0.0.1:8081/' (2)
);
1 | Column names |
2 | Schema registry |
Your schema registry will receive entries that contain an ID for the jet.sql
schema.
When you write new Avro objects to the Kafka topic, the Kafka connector uses its own ad-hoc schema named jet.sql
. The Kafka connector creates this schema from the column names that you mapped in the CREATE MAPPING
statement.
SQL Type | Avro Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
All Avro types are a union of the NULL
type and the actual type.
JSON Messages
If values are in the JSON format, configure the valueFormat
field as json
or json-flat
.
CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
'keyFormat' = 'bigint',
'valueFormat' = 'json',
'bootstrap.servers' = '127.0.0.1:9092');
CREATE MAPPING my_topic(
__key BIGINT,
ticker VARCHAR,
amount INT)
TYPE Kafka
OPTIONS (
'keyFormat' = 'bigint',
'valueFormat' = 'json-flat',
'bootstrap.servers' = '127.0.0.1:9092');
Java Messages
Java serialization uses the
Java objects exactly as the KafkaConsumer.poll()
method returns them. You can use
this format for objects serialized using Java serialization or any other
serialization method.
For this format you must also specify the class name using keyJavaClass
and
valueJavaClass
options, for example:
CREATE MAPPING my_topic
TYPE Kafka
OPTIONS (
'keyFormat' = 'java',
'keyJavaClass' = 'java.lang.Long',
'valueFormat' = 'java',
'valueJavaClass' = 'com.example.Person',
'value.serializer' = 'com.example.serialization.PersonSerializer',
'value.deserializer' = 'com.example.serialization.PersonDeserializer',
'bootstrap.servers' = '127.0.0.1:9092');
If the Java class corresponds to one of the basic data types (numbers,
dates, strings), that type will be used for the key or value
and mapped as a column named __key
for keys and this
for values. In
the example above, the key will be mapped with the BIGINT
type. In
fact, the above keyFormat
and keyJavaClass
options are equivalent to
'keyFormat'='bigint'
.
If the Java class is not one of the basic types, Hazelcast will analyze
the class using reflection and use its properties as column names. It
recognizes public fields and JavaBean-style getters. If some property
has a non-primitive type, it will be mapped under the SQL OBJECT
type.
Heterogeneous Messages
The Kafka connector supports heterogeneous messages. For example, say you have these messages in your topic:
{"name":"Alice","age":42}
{"name":"Bob","age":43,"petName":"Zaz"}
If you map the column petName
, it will have the value null
for the
1st entry. This scenario is supported. Similar behavior works
with Avro format.