A newer version of Hazelcast Platform is available.

View latest

Configuring the Jet Engine

This section describes Hazelcast’s Jet engine specific configuration. See the Configuration Options section to learn more about the configuration files and options.

Enabling the Jet Engine

You can enable Hazelcast’s Jet engine using the following configurations:

  • XML

  • YAML

  • Java

<hazelcast>
    ...
    <jet enabled="true" ...>
    ...
    </jet>
</hazelcast>
hazelcast:
  ...
  jet:
    enabled: true
    ...
...
Config config = new Config();
config.getJetConfig().setEnabled(true);
HazelcastInstance instance = createHazelcastInstance(config);
...

Note that, Hazelcast’s Jet engine is disabled by default when you use Hazelcast within the Java context; that is, hazelcast.jar. When using Hazelcast ZIP/TAR distributions or Docker and Kubernetes distribution, Jet engine is enabled by default. See the Security Defaults section for information about the enabled/disabled features for different Hazelcast distributions.

Enabling Resource Uploads

You can enable resource uploads for Jet engine jobs using the following configurations:

  • XML

  • YAML

  • Java

<hazelcast>
    ...
    <jet enabled="true" resource-upload-enabled="true">
    ...
    </jet>
</hazelcast>
hazelcast:
  ...
  jet:
    enabled: true
    resource-upload-enabled: true
    ...
...
Config config = new Config();
config.getJetConfig().setEnabled(true).setResourceUploadEnabled(true);
HazelcastInstance instance = createHazelcastInstance(config);
...

By default, resource uploading for Jet engine jobs is disabled.

List of Configuration Options

The hazelcast-full-example configuration files (available as YAML or XML) available in your distribution and linked from Declarative Configuration include a description of all configuration options. The configuration options for Jet engine are also listed below:

Option Default Description

cooperative-thread-count

number of cores

The number of threads Jet creates in its cooperative multithreading pool.

flow-control-period-ms

100

The Jet engine uses a flow control mechanism between cluster members to prevent a slower vertex from getting overflowed with data from a faster upstream vertex. Each receiver regularly reports to each sender how much more data it may send over a given DAG edge. This method sets the duration (in milliseconds) of the interval between flow-control packets.

backup-count

1

The number of synchronous backups to configure on the IMap that Jet needs internally to store job metadata and snapshots. The maximum allowed value is 6.

scale-up-delay-millis

10,000

The delay after which the auto-scaled jobs restart if a new member joins the cluster. It has no effect on jobs with auto scaling disabled.

lossless-restart-enabled

false

Specifies whether the Lossless Cluster Restart feature is enabled. With this feature, you can restart the whole cluster without losing the jobs and their state. It is implemented on top of Hazelcast’s Persistence feature, which persists the data to disk. You need to have the Hazelcast Enterprise edition and configure Hazelcast’s Persistence to use this feature. The default value is false, i.e., disabled.

max-processor-accumulated-records

Long.MAX_VALUE

Specifies the maximum number of records that can be accumulated by any single processor instance. Operations like grouping, sorting or joining require certain amount of records to be accumulated before they can proceed. You can set this option to reduce the probability of OutOfMemoryError. This option applies to each processor instance separately, hence the effective limit of records accumulated by each cluster member is influenced by the vertex’s localParallelism and the number of jobs in the cluster. Currently, this configuration option limits the:

  • number of items sorted by the sort operation

  • number of distinct keys accumulated by aggregation operations

  • number of entries in each hash-join lookup table

  • number of entries in stateful transforms

  • number of distinct items in distinct operation.

This limit does not apply to streaming aggregations.

edge-defaults/queue-size

Sets the capacity of processor-to-processor concurrent queues. The value is rounded upwards to the next power of 2.

edge-defaults/packet-size-limit

For a distributed edge, data is sent to a remote member via Hazelcast network packets. Each packet is dedicated to the data of a single edge, but may contain any number of data items. This setting limits the size of the packet in bytes. Packets should be large enough to drown out any fixed overheads, but small enough to allow good interleaving with other packets.

edge-defaults/receive-window-multiplier

Sets the scaling factor used by the adaptive receive window sizing function.

The following is an example declarative configuration:

  • XML

  • YAML

<hazelcast>
    <jet enabled="true" resource-upload-enabled="true">
        <cooperative-thread-count>4</cooperative-thread-count>
        <flow-control-period>100</flow-control-period>
        <backup-count>1</backup-count>
        <scale-up-delay-millis>10000</scale-up-delay-millis>
        <lossless-restart-enabled>false</lossless-restart-enabled>
        <max-processor-accumulated-records>1000000000</max-processor-accumulated-records>
        <edge-defaults>
            <queue-size>1024</queue-size>
            <packet-size-limit>16384</packet-size-limit>
            <receive-window-multiplier>3</receive-window-multiplier>
        </edge-defaults>
    </jet>

</hazelcast>
hazelcast:
  jet:
    enabled: true
    resource-upload-enabled: true
    cooperative-thread-count: 4
    flow-control-period: 100
    backup-count: 1
    scale-up-delay-millis: 10000
    lossless-restart-enabled: false
    max-processor-accumulated-records: 1000000000
    edge-defaults:
      queue-size: 1024
      packet-size-limit: 16384
      receive-window-multiplier: 3

List of Configuration Properties

Configuration properties can either be configured through Java system properties (specified using the standard -Dproperty=value) syntax before application startup or under the properties: inside the yaml file:

hazelcast:
  properties:
    jet.idle.cooperative.min.microseconds: 50
    jet.idle.cooperative.max.microseconds: 500
    jet.idle.noncooperative.min.microseconds: 50
    jet.idle.noncooperative.max.microseconds: 1000

You can also configure the Jet engine before starting as follows:

JAVA_OPTS=-D<property>=<value> bin/hz-start

The full list of Jet-specific properties can be found inside the com.hazelcast.jet.core.JetProperties class and the rest of properties are located inside com.hazelcast.spi.properties.ClusterProperty class. The most important properties are listed here:

Option Default Description

hazelcast.partition.count

271

Total number of partitions in the cluster.

hazelcast.logging.type

jdk

What logger should be used by Jet. Valid options are log4j, log4j2, slf4j and none.

jet.idle.cooperative.min.microseconds

25

The minimum time in microseconds the cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput.

jet.idle.cooperative.max.microseconds

500

The maximum time in microseconds the cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput.

jet.idle.noncooperative.min.microseconds

25

The minimum time in microseconds the non-cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput.

jet.idle.noncooperative.max.microseconds

5000

The maximum time in microseconds the non-cooperative worker threads will sleep if none of the tasklets made any progress. Lower values increase idle CPU usage but may result in decreased latency. Higher values will increase latency and very high values (>10000µs) will also limit throughput.

jet.job.results.max.size

1000

Maximum number of job results to keep in the cluster, the oldest results will be automatically deleted after this size is reached.

jet.job.results.ttl.seconds

604800

Maximum number of time in seconds the job results will be kept in the cluster. They will be automatically deleted after this period is reached.

Job-specific Configuration

Each job has job-specific configuration options. These are covered in detail in Configuring Jobs.

Client Configuration

When using a Hazelcast client to access Jet engine services, the easiest way to start configuring it using the programmatic approach is as follows:

ClientConfig config = new ClientConfig();
config.getNetworkConfig().addAddress("server1", "server2:5702");
HazelcastInstance client = HazelcastClient.newHazelcastClient(config);
JetService jetFromClient = client.getJet();

Alternatively, you can add hazelcast-client.yaml/xml to the classpath or working directory which will be picked up automatically. The location of the file can also be given using the hazelcast.client.config system property; that is, -Dhazelcast.client.config=C:/myhazelcast-client.yaml/xml.

A sample client YAML file is given below:

hazelcast-client:
  # Name of the cluster to connect to. Must match the name configured on the
  # cluster members.
  cluster-name: myjet
  network:
    # List of addresses for the client to try to connect to. All members of
    # a Hazelcast cluster accept client connections.
    cluster-members:
      - server1:5701
      - server2:5701
  connection-strategy:
    connection-retry:
      # how long the client should keep trying connecting to the server
      cluster-connect-timeout-millis: 3000