Distribute work across a cluster

Deploying a cluster of Flow instances allows for distribution of CPU intensive tasks, especially projections.

This is especially well suited for queries that return large numbers of records, which need to be projected. Distributing the work over a cluster provides significant performance improvements, as the work can be parallelized.

Understand how work is distributed

When running in a multi-node cluster, the node that receives the query becomes the work-coordinator for that query.

The work-coordinator is responsible for issuing the initial queries that find the source data. As data arrives on the work-coordinator, it is then added onto a queue for other nodes to pick up and project.

As worker nodes pick up projection tasks, they perform projection work (transforming and enriching data, calling remote services as required), and return the results back to the work-coordinator, who then sends the results back to the query issuer.

Considerations

Worker discovery via Multicast

When sharing work across a cluster of Flow nodes, the workers will use multicast to discover each other and share work.

If you’re using our Docker Compose template then this is configured for you by default.

Flow uses Hazelcast for multicast discovery. Read more about how multicast works in Hazelcast here.

HTTP Caches

Caches are not shared between worker nodes, so remote services may receive a higher number of calls during parallelized enrichment.

Distribute work preferentially on remote nodes

As the cluster size grows, the work can be parallelized across a greater number of nodes. While this provides improved throughput, the work coordinator incurs a heavier workload in serialization and deserialization of work tasks and responses.

To account for this the flow.projection.distributionRemoteBias allows tuning at which point the work is preferentially distributed to remote nodes, versus the query coordinator. Once this value is exceeded, the query coordinator node will perform a lower proportion of projection work in a query.

Work distribution configuration

For projection work to be distributed across all Flow nodes in a cluster, the following configuration options are provided:

Config parameter Description Default setting

flow.projection.distributionMode

Defines where projection work will be performed. On the local node or the cluster LOCAL, DISTRIBUTED

LOCAL

flow.projection.distributionPacketSize

Defines the number of records in each work packet distributed to other Flow servers. Applicable to DISTRIBUTED distributionMode only

100

flow.projection.distributionRemoteBias

Defines the number of cluster members before work is preferentially distributed to remote nodes

10

Advanced cluster configuration

This section may only be required if you’re configuring the container orchestration yourself and not using one of our preconfigured templates.

Cluster discovery types

Flow services can form a cluster discovering other nodes via one of three discovery mechanisms:

Mechanism Description

multicast

Flow services attempt to use multicast to discover other cluster nodes

aws

Flow services attempt to use AWS metadata to discover other cluster nodes

eureka

Flow services use Eureka metadata to discover other cluster nodes

Cluster discover options

Config parameter Description Default setting

flow.hazelcast.discovery

Specifies the discovery mechanism Hazelcast will use to find other instances. multicast, aws or eureka

multicast

flow.hazelcast.eurekaUri

URI of the coordinating Eureka instance when discovery is set to Eureka

http://127.0.0.1:8761/eureka/

flow.hazelcast.memberTag

Denotes which member type this is within the Hazelcast cluster

flow-query-service

flow.hazelcast.networkInterface

Should only be specified if the deployment environment requires that Hazelcast only bind to a single network interface. The network interface supplied should be specified in Hazelcast networking naming e.g. 10.10.., 172.16..

flow.hazelcast.useMetadataForHostAndPort

Specific to Eureka Hazelcast discovery. Specifies if metadata should be published to and used for Hazelcast node and port discovery

true

flow.hazelcast.awsPortScanRange

Specific to AWS Hazelcast discovery. Specifies the port ranges that will be scanned from running Hazelcast on EC2 instances

true

flow.hazelcast.taskPoolSize

Specifies the number of threads dedicated to processing distributed work loads

2

flow.hazelcast.taskQueueSize

Specifies the size of the queue of tasks on each cluster node. A value of 0 sets an infinite queue size

0

Example configurations

Multicast

Enable clustering using discovery via local network multicast with distributed projections

flow:
    projection:
        distributionMode: DISTRIBUTED

    hazelcast:
        discovery: multicast

AWS

Enable clustering using AWS discovery with distributed projections.

The Flow query server should be run on a EC2 instances where AWS_REGION is specified as an environment variable and the EC2 instance is authorized to query EC2 instances via IAM.

AWS_REGION may alternatively be specified as a JVM property -DAWS_REGION=eu-west_2

flow:
    projection:
        distributionMode: DISTRIBUTED

    hazelcast:
        discovery: aws

Eureka

Enable clustering using Eureka discovery with distributed projections

flow:
    projection:
        distributionMode: DISTRIBUTED

    hazelcast:
        discovery: eureka
        eurekaUri: http://eureka-server:8761/eureka/
        useMetadataForHostAndPort: true