Elasticsearch Connector
Elasticsearch is a popular fulltext search engine. You can use Elasticsearch as both a source and a sink with the Jet API.
Installing the Connector
This connector is included in the full distribution of Hazelcast.
To use this connector in the slim distribution, you must have one of the following modules on your members' classpaths:
-
hazelcast-jet-elasticsearch-6
-
hazelcast-jet-elasticsearch-7
Each module includes an Elasticsearch client that’s compatible with the given major version of Elasticsearch. The connector API is the same between different versions, apart from a few minor differences where we surface the API of Elasticsearch client. See the Javadoc for any such differences.
Permissions
Enterprise
If security is enabled, your clients may need permissions to use this connector. For details, see Securing Jobs.
Elasticsearch as a Source
The Elasticsearch connector source provides a builder and several convenience factory methods. Most commonly you need to provide the following:
-
A client supplier function, which returns a configured instance of
RestClientBuilder
(see Elasticsearch documentation), -
A search request supplier, specifying a query to Elasticsearch,
-
A mapping function from
SearchHit
to a desired type.
BatchSource<String> elasticSource = ElasticSources.elasticsearch(
() -> client("user", "password", "host", 9200),
() -> new SearchRequest("my-index"),
hit -> (String) hit.getSourceAsMap().get("name")
);
For all configuration options use the builder:
BatchSource<String> elasticSource = new ElasticSourceBuilder<String>()
.name("elastic-source")
.clientFn(() -> RestClient.builder(new HttpHost(
"localhost", 9200
)))
.searchRequestFn(() -> new SearchRequest("my-index"))
.optionsFn(request -> RequestOptions.DEFAULT)
.mapToItemFn(hit -> hit.getSourceAsString())
.slicing(true)
.build();
By default, the connector uses a single scroll to read data from Elasticsearch. There is only a single reader on a single node in the whole cluster.
Slicing can be used to parallelize reading from an index with more
shards. The number of slices is equal to globalParallelism
.
If Hazelcast members and Elasticsearch nodes are located on the same machines, the connector will use co-located reading, avoiding the overhead of physical network.
Failure Scenario Considerations
The connector uses retry capability of the underlying Elasticsearch client. This allows the connector to handle some transient network issues but it doesn’t cover all cases.
The source uses Elasticsearch’s Scroll API. The scroll context is stored on a node with the primary shard. If this node crashes, the search context is lost and the job can’t reliably read all documents, so the job fails.
If there is a network issue between Hazelcast and Elasticsearch the Elasticsearch client retries the request, allowing the job to continue.
However, there is an edge case where the scroll request is processed by the Elasticsearch server, moves the scroll cursor forward, but the response is lost. The client then retries and receives the next page, effectively skipping the previous page. The recommended way to handle this is to check the number of processed documents after the job finishes, possibly restart the job when not all documents are read.
These are known limitations of Elasticsearch Scroll API. There is an ongoing work on Elasticsearch side to fix these issues.
Elasticsearch as a Sink
The Elasticsearch connector sink provides a builder and several convenience factory methods. Most commonly you need to provide:
-
A client supplier, which returns a configured instance of
RestHighLevelClient
(see Elasticsearch documentation), -
A mapping function to map items from the pipeline to an instance of one of
IndexRequest
,UpdateRequest
orDeleteRequest
.
Suppose type of the items in the pipeline is Map<String, Object>
, the
sink can be created using the following:
Sink<Map<String, Object>> elasticSink = ElasticSinks.elasticsearch(
() -> client("user", "password", "host", 9200),
item -> new IndexRequest("my-index").source(item)
);
For all configuration options use the builder:
Sink<Map<String, Object>> elasticSink = new ElasticSinkBuilder<Map<String, Object>>()
.clientFn(() -> RestClient.builder(new HttpHost(
"localhost", 9200
)))
.bulkRequestFn(BulkRequest::new)
.mapToRequestFn((map) -> new IndexRequest("my-index").source(map))
.optionsFn(request -> RequestOptions.DEFAULT)
.build();
The Elasticsearch sink doesn’t implement co-located writing. To achieve
maximum write throughput, provide all nodes to the RestClient
and configure parallelism.
Failure Scenario Considerations
The sink connector is able to handle transient network failures, failures of nodes in the cluster and cluster changes, e.g., scaling up.
Transient network failures between Hazelcast and Elasticsearch cluster are handled by retries in the Elasticsearch client.
The worst case scenario is when a master node containing a primary of a shard fails.
First, you need to set BulkRequest.waitForActiveShards(int)
to ensure
that a document is replicated to at least some replicas. Also, you can’t
use the auto-generated ids and need to set the document id manually to
avoid duplicate records.
Second, you need to make sure new master node and primary shard is allocated before the client times out. This involves:
-
configuration of the following properties on the client:
org.apache.http.client.config.RequestConfig.Builder.setConnectionRequestTimeout org.apache.http.client.config.RequestConfig.Builder.setConnectTimeout org.apache.http.client.config.RequestConfig.Builder.setSocketTimeout
-
and configuration of the following properties in the Elasticsearch cluster:
cluster.election.max_timeout cluster.fault_detection.follower_check.timeout cluster.fault_detection.follower_check.retry_count cluster.fault_detection.leader_check.timeout cluster.fault_detection.leader_check.retry_count cluster.follower_lag.timeout transport.connect_timeout transport.ping_schedule network.tcp.connect_timeout
For details see Elasticsearch documentation section on cluster fault detection.