Mapping to Kafka
Before you can query streaming messages in Kafka topics, you need to create a mapping with the Kafka connector so that the SQL service knows how to access messages in the most efficient way.
What is the Kafka Connector
The Kafka connector acts as a consumer or a producer so that you can create streaming queries that continuously query a given Kafka topic.
Installing the Connector
This connector is included in the full distribution of Hazelcast.
If you’re using the slim distribution, you must add the
hazelcast-jet-kafka module to your member’s classpath.
The Kafka connector is compatible with Kafka version equal to or greater than 1.0.0.
If you use Hazelcast Enterprise, your clients may need permissions to use this connector. For details, see Securing Jobs.
Creating a Kafka Mapping
To create a mapping to a Kafka topic in SQL, you must use the
CREATE MAPPING statement to tell the Kafka connector the following:
The name of the topic
The address of the Kafka broker
How to serialize/deserialize the keys and values in messages
CREATE MAPPING my_topic (1) TYPE Kafka (2) OPTIONS ( 'bootstrap.servers' = '127.0.0.1:9092' (3) );
|1||The name of the Kafka topic.|
|2||The name of the connector.|
|3||The address of the Kafka broker.|
Any key/value pairs in the
When creating a Kafka mapping, you must tell the Kafka connector how to serialize/deserialize the keys and values in Kafka messages.
To read Kafka messages, the Kafka connector must be able to deserialize them. Similiarly, to publish messages to Kafka topics, the Kafka connector must be able to serialize them. You can tell Hazelcast how to serialize/deserialize keys and values by specifying the
valueFormat fields in the
The Kafka connector supports serializers for the following formats:
For keys and values that are primitives, set the format as the SQL data type that corresponds to the primitive. For example, if the message key is an integer and the value is a
string, use the
CREATE MAPPING my_topic TYPE Kafka OPTIONS ( 'keyFormat'='int', 'valueFormat'='varchar', 'bootstrap.servers' = '127.0.0.1:9092' );
The Kafka connector will apply a suitable serializer/deserializer automatically. For example:
'key.serializer' = 'org.apache.kafka.common.serialization.IntegerSerializer', 'key.deserializer' = 'org.apache.kafka.common.serialization.IntegerDeserializer', 'value.serializer' = 'org.apache.kafka.common.serialization.StringSerializer', 'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer'
For a conversion table, see SQL Data Types.
For keys or values that are in the Avro format, you need to configure different options, depending on whether you have your own Avro schema.
If you don’t already have an Avro schema, map column names to the fields and types that you want to use in your schema. The Kafka connector uses these column names to create its own ad-hoc schema named
CREATE MAPPING my_topic ( (1) __key VARCHAR, ticker VARCHAR, amount BIGINT, price DECIMAL ) TYPE Kafka OPTIONS ( 'keyFormat' = 'varchar', 'valueFormat' = 'avro', 'bootstrap.servers' = '127.0.0.1:9092' );
If you already have an Avro schema, you must:
Map column names to the fields and types of your schema.
Provide the URL of your schema registry in the
CREATE MAPPING my_topic ( (1) __key VARCHAR, ticker VARCHAR, amount BIGINT, price DECIMAL ) TYPE Kafka OPTIONS ( 'keyFormat' = 'varchar', 'valueFormat' = 'avro', 'bootstrap.servers' = '127.0.0.1:9092', 'schema.registry.url' = 'http://127.0.0.1:8081/' (2) );
Your schema registry will receive entries that contain an ID for the
When you write new Avro objects to the Kafka topic, the Kafka connector uses its own ad-hoc schema named
jet.sql. The Kafka connector creates this schema from the column names that you mapped in the
CREATE MAPPING statement.
|SQL Type||Avro Type|
All Avro types are a union of the
NULL type and the actual type.
If values are in the JSON format, configure the
valueFormat field as the
json-flat type, and map column names to the JSON keys.
CREATE MAPPING my_topic( (1) __key BIGINT, ticker VARCHAR, amount INT) TYPE Kafka OPTIONS ( 'keyFormat' = 'bigint', 'valueFormat' = 'json-flat', 'bootstrap.servers' = '127.0.0.1:9092');
There are no additional options for this format.
JSON’s type system doesn’t match SQL’s exactly. For example, JSON
numbers have unlimited precision, but such numbers are typically not
portable. We convert SQL integer and floating-point types into JSON
numbers. We convert the
DECIMAL type, as well as all temporal types,
to JSON strings.
We don’t support the
JSON type from the SQL standard yet. That means
you can’t use functions like
JSON_QUERY. If your JSON
documents don’t all have the same fields or if they contain nested
objects, the usability is limited.
Java serialization uses the
Java objects exactly as the
KafkaConsumer.poll() method returns them. You can use
this format for objects serialized using Java serialization or any other
For this format you must also specify the class name using
valueJavaClass options, for example:
CREATE MAPPING my_topic TYPE Kafka OPTIONS ( 'keyFormat' = 'java', 'keyJavaClass' = 'java.lang.Long', 'valueFormat' = 'java', 'valueJavaClass' = 'com.example.Person', 'value.serializer' = 'com.example.serialization.PersonSerializer', 'value.deserializer' = 'com.example.serialization.PersonDeserializer', 'bootstrap.servers' = '127.0.0.1:9092');
If the Java class corresponds to one of the basic data types (numbers,
dates, strings), that type will be used for the key or value
and mapped as a column named
__key for keys and
this for values. In
the example above, the key will be mapped with the
BIGINT type. In
fact, the above
keyJavaClass options are equivalent to
If the Java class is not one of the basic types, Hazelcast will analyze
the class using reflection and use its properties as column names. It
recognizes public fields and JavaBean-style getters. If some property
has a non-primitive type, it will be mapped under the SQL
The Kafka connector supports heterogeneous messages. For example, say you have these messages in your topic:
If you map the column
petName, it will have the value
null for the
key=1. This scenario is supported. Similar behavior works
with Avro format.