Elasticsearch Connector

Elasticsearch is a popular fulltext search engine. Hazelcast can use it both as a source and a sink.

Installing the Connector

This connector is included in the full distribution of Hazelcast.

To use this connector in the slim distribution, you must have one of the following modules on your members' classpaths:

  • hazelcast-jet-elasticsearch-5

  • hazelcast-jet-elasticsearch-6

  • hazelcast-jet-elasticsearch-7

Each module includes an Elasticsearch client that’s compatible with the given major version of Elasticsearch. The connector API is the same between different versions, apart from a few minor differences where we surface the API of Elasticsearch client. See the Javadoc for any such differences.

Permissions

If you use Hazelcast Enterprise, your clients may need permissions to use this connector. For details, see Securing Jobs.

Elasticsearch as a Source

The Elasticsearch connector source provides a builder and several convenience factory methods. Most commonly one needs to provide:

  • A client supplier function, which returns a configured instance of RestClientBuilder (see Elasticsearch documentation),

  • A search request supplier specifying a query to Elasticsearch,

  • A mapping function from SearchHit to a desired type.

Example using a factory method:

BatchSource<String> elasticSource = ElasticSources.elasticsearch(
    () -> client("user", "password", "host", 9200),
    () -> new SearchRequest("my-index"),
    hit -> (String) hit.getSourceAsMap().get("name")
);

For all configuration options use the builder:

BatchSource<String> elasticSource = new ElasticSourceBuilder<String>()
        .name("elastic-source")
        .clientFn(() -> RestClient.builder(new HttpHost(
                "localhost", 9200
        )))
        .searchRequestFn(() -> new SearchRequest("my-index"))
        .optionsFn(request -> RequestOptions.DEFAULT)
        .mapToItemFn(hit -> hit.getSourceAsString())
        .slicing(true)
        .build();

By default, the connector uses a single scroll to read data from Elasticsearch - there is only a single reader on a single node in the whole cluster.

Slicing can be used to parallelize reading from an index with more shards. Number of slices equals to globalParallelism.

If Hazelcast members and Elasticsearch nodes are located on the same machines then the connector will use co-located reading, avoiding the overhead of physical network.

Failure Scenario Considerations

The connector uses retry capability of the underlying Elasticsearch client. This allows the connector to handle some transient network issues but it doesn’t cover all cases.

The source uses Elasticsearch’s Scroll API. The scroll context is stored on a node with the primary shard. If this node crashes, the search context is lost and the job can’t reliably read all documents, so the job fails.

If there is a network issue between Hazelcast and Elasticsearch the Elasticsearch client retries the request, allowing the job to continue.

However, there is an edge case where the scroll request is processed by the Elasticsearch server, moves the scroll cursor forward, but the response is lost. The client then retries and receives the next page, effectively skipping the previous page. The recommended way to handle this is to check the number of processed documents after the job finishes, possibly restart the job when not all documents are read.

These are known limitations of Elasticsearch Scroll API. There is an ongoing work on Elasticsearch side to fix these issues.

Elasticsearch as a Sink

The Elasticsearch connector sink provides a builder and several convenience factory methods. Most commonly you need to provide:

  • A client supplier, which returns a configured instance of RestHighLevelClient (see Elasticsearch documentation),

  • A mapping function to map items from the pipeline to an instance of one of IndexRequest, UpdateRequest or DeleteRequest.

  • Suppose type of the items in the pipeline is Map<String, Object>, the sink can be created using

Sink<Map<String, Object>> elasticSink = ElasticSinks.elasticsearch(
    () -> client("user", "password", "host", 9200),
    item -> new IndexRequest("my-index").source(item)
);

For all configuration options use the builder:

Sink<Map<String, Object>> elasticSink = new ElasticSinkBuilder<Map<String, Object>>()
    .clientFn(() -> RestClient.builder(new HttpHost(
            "localhost", 9200
    )))
    .bulkRequestFn(BulkRequest::new)
    .mapToRequestFn((map) -> new IndexRequest("my-index").source(map))
    .optionsFn(request -> RequestOptions.DEFAULT)
    .build();

The Elasticsearch sink doesn’t implement co-located writing. To achieve maximum write throughput provide all nodes to the RestClient and configure parallelism.

Failure Scenario Considerations

The sink connector is able to handle transient network failures, failures of nodes in the cluster and cluster changes, e.g., scaling up.

Transient network failures between Hazelcast and Elasticsearch cluster are handled by retries in the Elasticsearch client.

The worst case scenario is when a master node containing a primary of a shard fails.

First, you need to set BulkRequest.waitForActiveShards(int) to ensure that a document is replicated to at least some replicas. Also, you can’t use the auto-generated ids and need to set the document id manually to avoid duplicate records.

Second, you need to make sure new master node and primary shard is allocated before the client times out. This involves:

  • configuration of the following properties on the client:

    org.apache.http.client.config.RequestConfig.Builder.setConnectionRequestTimeout
    org.apache.http.client.config.RequestConfig.Builder.setConnectTimeout
    org.apache.http.client.config.RequestConfig.Builder.setSocketTimeout
  • and configuration of the following properties in the Elasticsearch cluster:

    cluster.election.max_timeout
    cluster.fault_detection.follower_check.timeout
    cluster.fault_detection.follower_check.retry_count
    cluster.fault_detection.leader_check.timeout
    cluster.fault_detection.leader_check.retry_count
    cluster.follower_lag.timeout
    transport.connect_timeout
    transport.ping_schedule
    network.tcp.connect_timeout

For details see Elasticsearch documentation section on cluster fault detection.