Kafka Connect Source Connectors
With a Kafka Connect Source connector, you can reliably import data from an external system, such as a database, key-value store, search index, or file system, directly into a Hazelcast data pipeline. The data is available for stream processing. No Kafka cluster is required.
Kafka Connect Source connectors are available for over 100 popular platforms, including Neo4j and Couchbase. There is also a Kafka Connect Source connector for integrating JDBC data sources.
Installing Connector Dependencies
Kafka Connect Source connector dependencies are included in the slim and full distributions of Hazelcast.
To use the latest version of the dependencies, add the hazelcast-jet-kafka-connect
module to your Java project.
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet-kafka-connect</artifactId>
<version>6.0.0-SNAPSHOT</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
compile group: 'com.hazelcast.jet', name: 'hazelcast-jet-kafka-connect', version: $6.0.0-SNAPSHOT, classifier: 'jar-with-dependencies'
Downloading the Kafka Connect Source Connector
Download the correct Kafka Connect Source connector for the data source that you want to integrate. Kafka Source connectors consist of either a ZIP file that contains the connector JAR and dependencies, or a single JAR that contains everything you need.
Every Kafka Connect Source connector comes with documentation that includes the following:
-
Features of the connector
-
Configuration details
Permissions
Enterprise Edition
If security is enabled, your clients may need updated permissions to upload the ZIP or JAR file used by the Kafka Connect Source Connector. For details, see Securing Jobs.
Adding the Connector Configuration
To use your Kafka Connect Source connector as a streaming data source, you need to add the connector configuration to a pipeline.
All examples in this section are from a pipeline that uses a Kafka-based Source connector to generate random numbers.
Start by creating a Properties
object for your pipeline.
Properties props = new Properties();
props.setProperty("name", "random-source-connector"); (1)
props.setProperty("connector.class", "sasakitoa.kafka.connect.random.RandomSourceConnector");
props.setProperty("generator.class", "sasakitoa.kafka.connect.random.generator.RandomInt"); (2)
props.setProperty("messages.per.second", "1000");
props.setProperty("topic", "test");
props.setProperty("task.summary.enable", "true");
1 | Mandatory properties: Only the unique connector name and connector class are required. |
2 | Connector-specific properties: Each type of connector has a set properties that you may need to include. For example, connection details to your data source. |
Next, create the data source in your pipeline by calling the KafkaConnectSources.connect()
method with the Properties
object.
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(KafkaConnectSources.connect(props))
.withoutTimestamps()
.setLocalParallelism(2) (1)
.map(record -> Values.convertToString(record.valueSchema(), record.value())) (2)
.writeTo(AssertionSinks.assertCollectedEventually(60, list -> assertEquals(ITEM_COUNT, list.size())));
1 | Scaling connector tasks: For scaling, use local parallelism to run multiple connector tasks on a single random cluster member. |
2 | Data source records: Records are emitted from the Kafka Connect API with the SourceRecord type, ready for you to access the key and value along with the corresponding schemas. |
Finally, make the Kafka Connect Source connector available to the pipeline. You can do this in two ways:
-
Upload the ZIP or JAR file of Kafka Connect Source connector as part of your job configuration. Use this approach if you intend to use the data source for a remote server or a single pipeline.
Job configuration for a ZIP fileJobConfig jobConfig = new JobConfig(); jobConfig.addJarsInZip("/path/to/random-connector-1.0-SNAPSHOT.zip");
Job configuration for a JAR fileJobConfig jobConfig = new JobConfig(); jobConfig.addJar(Objects.requireNonNull(this.getClass() .getClassLoader() .getResource("random-connector-1.0-SNAPSHOT.jar")) .getPath() );
-
Add the JAR file of the Kafka Connect Source connector to the classpath of your Hazelcast members. Use this approach for a local cluster.
Starting the Data Source
Submit the pipeline as a job to your Hazelcast cluster. When the job is running, the data source will emit items from the Kafka Connect API with the SourceRecord
type. The key and value for each record will be accessible along with their corresponding schemas.
Fault Tolerance
When you deploy a Kafka Connect Source connector to a Hazelcast cluster, the Jet engine stores snapshots of the connector state. Examples of snapshotted connector states include partition offsets and any metadata required for a restart or for recovery. If there is a connector failure, the recovery behavior depends on the type of connector. Refer to the documentation of your Kafka Connect Source connector for more detailed information.
Parallelism and Reconfigurations
Kafka Connect Source uses the tasks.max
configuration property to determine parallelism.
The tasks.max
value takes precedence even if pipeline’s total parallelism is higher than that value.
A pipeline’s total parallelism cannot be lower than tasks.max
, where total parallelism is calculated as:
total parallelism = edge's local parallelism * number of nodes
Hazelcast Jet requests tasks.max
task configurations after an instance of SourceConnector has been created. If the
SourceConnector returns fewer configurations than specified by the tasks.max
value, the remaining processors do nothing.
When SourceConnector requests a reconfiguration, Hazelcast requests the new collection of configuration sets and distributes them across the cluster. Existing processors automatically restart to use the updated configuration. If the number of returned task configuration sets has increased, some of the processors that previously did nothing can start processing entries.