A newer version of Hazelcast Platform is available.

View latest

Kafka Connect Source Connectors

With a Kafka Connect Source connector, you can reliably import data from an external system, such as a database, key-value store, search index, or file system, directly into a Hazelcast data pipeline. The data is available for stream processing. No Kafka cluster is required.

Kafka Connect Source connectors are available for over 100 popular platforms, including Neo4j and Couchbase. There is also a Kafka Connect Source connector for integrating JDBC data sources.

Installing Connector Dependencies

Kafka Connect Source connector dependencies are included in the slim and full distributions of Hazelcast.

To use the latest version of the dependencies, add the hazelcast-jet-kafka-connect module to your Java project.

  • Maven

  • Gradle

<dependency>
  <groupId>com.hazelcast.jet</groupId>
  <artifactId>hazelcast-jet-kafka-connect</artifactId>
  <version>5.3.8</version>
</dependency>
compile group: 'com.hazelcast.jet', name: 'hazelcast-jet-kafka-connect', version: $5.3.8

Downloading the Kafka Connect Source Connector

Download the correct Kafka Connect Source connector for the data source that you want to integrate. Kafka Source connectors consist of either a ZIP file that contains the connector JAR and dependencies, or a single JAR that contains everything you need.

Every Kafka Connect Source connector comes with documentation that includes the following:

  • Features of the connector

  • Configuration details

Permissions

Enterprise

If security is enabled, your clients may need updated permissions to upload the ZIP or JAR file used by the Kafka Connect Source Connector. For details, see Securing Jobs.

Adding the Connector Configuration

To use your Kafka Connect Source connector as a streaming data source, you need to add the connector configuration to a pipeline.

All examples in this section are from a pipeline that uses a Kafka-based Source connector to generate random numbers.

Start by creating a Properties object for your pipeline.

Properties props = new Properties();

props.setProperty("name", "random-source-connector"); (1)
props.setProperty("connector.class", "sasakitoa.kafka.connect.random.RandomSourceConnector");

props.setProperty("generator.class", "sasakitoa.kafka.connect.random.generator.RandomInt"); (2)
props.setProperty("messages.per.second", "1000");
props.setProperty("topic", "test");
props.setProperty("task.summary.enable", "true");
1 Mandatory properties: Only the unique connector name and connector class are required.
2 Connector-specific properties: Each type of connector has a set properties that you may need to include. For example, connection details to your data source.

Next, create the data source in your pipeline by calling the KafkaConnectSources.connect() method with the Properties object.

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(KafkaConnectSources.connect(props))
        .withoutTimestamps()
        .setLocalParallelism(2) (1)
        .map(record -> Values.convertToString(record.valueSchema(), record.value())) (2)
        .writeTo(AssertionSinks.assertCollectedEventually(60, list -> assertEquals(ITEM_COUNT, list.size())));
1 Scaling connector tasks: For scaling, use local parallelism to run multiple connector tasks on a single random cluster member.
2 Data source records: Records are emitted from the Kafka Connect API with the SourceRecord type, ready for you to access the key and value along with the corresponding schemas.

Finally, make the Kafka Connect Source connector available to the pipeline. You can do this in two ways:

  • Upload the ZIP or JAR file of Kafka Connect Source connector as part of your job configuration. Use this approach if you intend to use the data source for a remote server or a single pipeline.

    Job configuration for a ZIP file
    JobConfig jobConfig = new JobConfig();
    jobConfig.addJarsInZip("/path/to/random-connector-1.0-SNAPSHOT.zip");
    Job configuration for a JAR file
    JobConfig jobConfig = new JobConfig();
    jobConfig.addJar(Objects.requireNonNull(this.getClass()
            .getClassLoader()
            .getResource("random-connector-1.0-SNAPSHOT.jar"))
            .getPath()
            );
  • Add the JAR file of the Kafka Connect Source connector to the classpath of your Hazelcast members. Use this approach for a local cluster.

Starting the Data Source

Submit the pipeline as a job to your Hazelcast cluster. When the job is running, the data source will emit items from the Kafka Connect API with the SourceRecord type. The key and value for each record will be accessible along with their corresponding schemas.

Fault Tolerance

When a Kafka Connect Source connector is deployed to a Hazelcast cluster, snapshots of the connector state are stored in the Jet engine. For example, partition offsets and any metadata required for a restart or recovery. If there is a connector failure, the recovery behavior will depend on the type of connector. Refer to the documentation of your Kafka Connect Source connector for more detailed information.