Kafka Connect Source Connectors

With a Kafka Connect Source connector, you can reliably import data from an external system, such as a database, key-value store, search index, or file system, directly into a Hazelcast data pipeline. The data is available for stream processing. No Kafka cluster is required.

Kafka Connect Source connectors are available for over 100 popular platforms, including Neo4j and Couchbase. There is also a Kafka Connect Source connector for integrating JDBC data sources.

Installing Connector Dependencies

Kafka Connect Source connector dependencies are included in the slim and full distributions of Hazelcast.

To use the latest version of the dependencies, add the hazelcast-jet-kafka-connect module to your Java project.

  • Maven

  • Gradle

<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-jet-kafka-connect</artifactId>
    <version>5.5.2</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>
compile group: 'com.hazelcast.jet', name: 'hazelcast-jet-kafka-connect', version: $5.5.2, classifier: 'jar-with-dependencies'

Downloading the Kafka Connect Source Connector

Download the correct Kafka Connect Source connector for the data source that you want to integrate. Kafka Source connectors consist of either a ZIP file that contains the connector JAR and dependencies, or a single JAR that contains everything you need.

Every Kafka Connect Source connector comes with documentation that includes the following:

  • Features of the connector

  • Configuration details

Permissions

Enterprise Edition

If security is enabled, your clients may need updated permissions to upload the ZIP or JAR file used by the Kafka Connect Source Connector. For details, see Securing Jobs.

Adding the Connector Configuration

To use your Kafka Connect Source connector as a streaming data source, you need to add the connector configuration to a pipeline.

All examples in this section are from a pipeline that uses a Kafka-based Source connector to generate random numbers.

Start by creating a Properties object for your pipeline.

Properties props = new Properties();

props.setProperty("name", "random-source-connector"); (1)
props.setProperty("connector.class", "sasakitoa.kafka.connect.random.RandomSourceConnector");

props.setProperty("generator.class", "sasakitoa.kafka.connect.random.generator.RandomInt"); (2)
props.setProperty("messages.per.second", "1000");
props.setProperty("topic", "test");
props.setProperty("task.summary.enable", "true");
1 Mandatory properties: Only the unique connector name and connector class are required.
2 Connector-specific properties: Each type of connector has a set properties that you may need to include. For example, connection details to your data source.

Next, create the data source in your pipeline by calling the KafkaConnectSources.connect() method with the Properties object.

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(KafkaConnectSources.connect(props))
        .withoutTimestamps()
        .setLocalParallelism(2) (1)
        .map(record -> Values.convertToString(record.valueSchema(), record.value())) (2)
        .writeTo(AssertionSinks.assertCollectedEventually(60, list -> assertEquals(ITEM_COUNT, list.size())));
1 Scaling connector tasks: For scaling, use local parallelism to run multiple connector tasks on a single random cluster member.
2 Data source records: Records are emitted from the Kafka Connect API with the SourceRecord type, ready for you to access the key and value along with the corresponding schemas.

Finally, make the Kafka Connect Source connector available to the pipeline. You can do this in two ways:

  • Upload the ZIP or JAR file of Kafka Connect Source connector as part of your job configuration. Use this approach if you intend to use the data source for a remote server or a single pipeline.

    Job configuration for a ZIP file
    JobConfig jobConfig = new JobConfig();
    jobConfig.addJarsInZip("/path/to/random-connector-1.0-SNAPSHOT.zip");
    Job configuration for a JAR file
    JobConfig jobConfig = new JobConfig();
    jobConfig.addJar(Objects.requireNonNull(this.getClass()
            .getClassLoader()
            .getResource("random-connector-1.0-SNAPSHOT.jar"))
            .getPath()
            );
  • Add the JAR file of the Kafka Connect Source connector to the classpath of your Hazelcast members. Use this approach for a local cluster.

Starting the Data Source

Submit the pipeline as a job to your Hazelcast cluster. When the job is running, the data source will emit items from the Kafka Connect API with the SourceRecord type. The key and value for each record will be accessible along with their corresponding schemas.

Fault Tolerance

When you deploy a Kafka Connect Source connector to a Hazelcast cluster, the Jet engine stores snapshots of the connector state. Examples of snapshotted connector states include partition offsets and any metadata required for a restart or for recovery. If there is a connector failure, the recovery behavior depends on the type of connector. Refer to the documentation of your Kafka Connect Source connector for more detailed information.

Parallelism and Reconfigurations

Kafka Connect Source uses the tasks.max configuration property to determine parallelism. The tasks.max value takes precedence even if pipeline’s total parallelism is higher than that value. A pipeline’s total parallelism cannot be lower than tasks.max, where total parallelism is calculated as:

total parallelism = edge's local parallelism * number of nodes

Hazelcast Jet requests tasks.max task configurations after an instance of SourceConnector has been created. If the SourceConnector returns fewer configurations than specified by the tasks.max value, the remaining processors do nothing.

When SourceConnector requests a reconfiguration, Hazelcast requests the new collection of configuration sets and distributes them across the cluster. Existing processors automatically restart to use the updated configuration. If the number of returned task configuration sets has increased, some of the processors that previously did nothing can start processing entries.