Elasticsearch is a popular fulltext search engine. Hazelcast can use it both as a source and a sink.
This connector is included in the full distribution of Hazelcast.
To use this connector in the slim distribution, you must have one of the following modules on your members' classpaths:
Each module includes an Elasticsearch client that’s compatible with the given major version of Elasticsearch. The connector API is the same between different versions, apart from a few minor differences where we surface the API of Elasticsearch client. See the Javadoc for any such differences.
If you use Hazelcast Enterprise, your clients may need permissions to use this connector. For details, see Securing Jobs.
The Elasticsearch connector source provides a builder and several convenience factory methods. Most commonly one needs to provide:
A client supplier function, which returns a configured instance of RestClientBuilder (see Elasticsearch documentation),
A search request supplier specifying a query to Elasticsearch,
A mapping function from
SearchHitto a desired type.
Example using a factory method:
BatchSource<String> elasticSource = ElasticSources.elasticsearch( () -> client("user", "password", "host", 9200), () -> new SearchRequest("my-index"), hit -> (String) hit.getSourceAsMap().get("name") );
For all configuration options use the builder:
BatchSource<String> elasticSource = new ElasticSourceBuilder<String>() .name("elastic-source") .clientFn(() -> RestClient.builder(new HttpHost( "localhost", 9200 ))) .searchRequestFn(() -> new SearchRequest("my-index")) .optionsFn(request -> RequestOptions.DEFAULT) .mapToItemFn(hit -> hit.getSourceAsString()) .slicing(true) .build();
By default, the connector uses a single scroll to read data from Elasticsearch - there is only a single reader on a single node in the whole cluster.
Slicing can be used to parallelize reading from an index with more shards. Number of slices equals to globalParallelism.
If Hazelcast members and Elasticsearch nodes are located on the same machines then the connector will use co-located reading, avoiding the overhead of physical network.
The connector uses retry capability of the underlying Elasticsearch client. This allows the connector to handle some transient network issues but it doesn’t cover all cases.
The source uses Elasticsearch’s Scroll API. The scroll context is stored on a node with the primary shard. If this node crashes, the search context is lost and the job can’t reliably read all documents, so the job fails.
If there is a network issue between Hazelcast and Elasticsearch the Elasticsearch client retries the request, allowing the job to continue.
However, there is an edge case where the scroll request is processed by the Elasticsearch server, moves the scroll cursor forward, but the response is lost. The client then retries and receives the next page, effectively skipping the previous page. The recommended way to handle this is to check the number of processed documents after the job finishes, possibly restart the job when not all documents are read.
These are known limitations of Elasticsearch Scroll API. There is an ongoing work on Elasticsearch side to fix these issues.
The Elasticsearch connector sink provides a builder and several convenience factory methods. Most commonly you need to provide:
A client supplier, which returns a configured instance of RestHighLevelClient (see Elasticsearch documentation),
A mapping function to map items from the pipeline to an instance of one of
Suppose type of the items in the pipeline is
Map<String, Object>, the sink can be created using
Sink<Map<String, Object>> elasticSink = ElasticSinks.elasticsearch( () -> client("user", "password", "host", 9200), item -> new IndexRequest("my-index").source(item) );
For all configuration options use the builder:
Sink<Map<String, Object>> elasticSink = new ElasticSinkBuilder<Map<String, Object>>() .clientFn(() -> RestClient.builder(new HttpHost( "localhost", 9200 ))) .bulkRequestFn(BulkRequest::new) .mapToRequestFn((map) -> new IndexRequest("my-index").source(map)) .optionsFn(request -> RequestOptions.DEFAULT) .build();
The Elasticsearch sink doesn’t implement co-located writing. To achieve
maximum write throughput provide all nodes to the
and configure parallelism.
The sink connector is able to handle transient network failures, failures of nodes in the cluster and cluster changes, e.g., scaling up.
Transient network failures between Hazelcast and Elasticsearch cluster are handled by retries in the Elasticsearch client.
The worst case scenario is when a master node containing a primary of a shard fails.
First, you need to set
BulkRequest.waitForActiveShards(int) to ensure
that a document is replicated to at least some replicas. Also, you can’t
use the auto-generated ids and need to set the document id manually to
avoid duplicate records.
Second, you need to make sure new master node and primary shard is allocated before the client times out. This involves:
configuration of the following properties on the client:
org.apache.http.client.config.RequestConfig.Builder.setConnectionRequestTimeout org.apache.http.client.config.RequestConfig.Builder.setConnectTimeout org.apache.http.client.config.RequestConfig.Builder.setSocketTimeout
and configuration of the following properties in the Elasticsearch cluster:
cluster.election.max_timeout cluster.fault_detection.follower_check.timeout cluster.fault_detection.follower_check.retry_count cluster.fault_detection.leader_check.timeout cluster.fault_detection.leader_check.retry_count cluster.follower_lag.timeout transport.connect_timeout transport.ping_schedule network.tcp.connect_timeout
For details see Elasticsearch documentation section on cluster fault detection.