This is a prerelease version.

View latest

Configuring Jobs

Before submitting a job to a cluster, you can configure its name, whether to record metrics, and how it behaves when cluster members fail.

Job Configuration Options

These are the available configuration options for jobs.

Option Usage Allowed Value Default Value

processingGuarantee

Set the strategy for taking snapshots of running jobs.

Snapshots use additional in-memory storage.

NONE

AT_LEAST_ONCE

EXACTLY_ONCE

Depends on the source or sink.

snapshotIntervalMillis

Set the interval in milliseconds between each snapshot. This interval is only relevant if the processingGuarantee setting is not NONE.

positive INT

10000

autoScaling

Enable jobs to scale automatically when new members are added to the cluster or existing members are removed from the cluster.

boolean

true

splitBrainProtectionEnabled

In case of a network partition, enable a job to continue running only on the cluster quorum.

boolean

false

initialSnapshotName

Name the snapshot from which to start the job.

string

''

Setting the Job Name

Each job has a cluster-wide unique ID and an optional name. Only one job with the same name can be running in the cluster at the same time.

  • Java

  • SQL

JobConfig.setName('myJob');
When a job is already running with the same name, the newly submitted one will fail. You can avoid this by using the JetService.newJobIfAbsent() method.
CREATE JOB myJob

Setting a Processing Guarantee for Streaming Jobs

When you set a processing guarantee that isn’t NONE, Hazelcast takes distributed snapshots of your jobs. The distributed snapshot algorithm works by sending barriers down the event stream which upon receiving causes the Jet processors to save their state as a snapshot. Snapshots are saved in-memory in map backups and replicated across the cluster.

For details of the distributed snapshot algorithm, see Distributed Snapshot

Since a processor can have multiple inputs, it must wait until the barrier is received from all inputs before taking a snapshot. The difference between AT_LEAST_ONCE and EXACTLY_ONCE lies in how the processor handles the barrier. With the AT_LEAST_ONCE option, the processor can continue to process items from inputs which have already received the barrier. This strategy results in lower latency and higher throughput overall, with the caveat that some items may be processed twice after a restart:

  • NONE: No snapshots will be taken and upon a restart due to cluster change, the job will be restarted as if it was started from scratch.

  • AT_LEAST_ONCE: Enables snapshots. When a job is restarted it will be resumed from the latest available snapshot. Items which have been processed before the snapshot might be processed again after the job is resumed. This option provides better latency than the EXACTLY_ONCE option with weaker guarantees.

  • EXACTLY_ONCE: Enables snapshots. When a job is restarted it will be resumed from the latest available snapshot. Items which have been processed before the snapshot are guaranteed not to be processed again after the job is resumed. This option provides the strongest correctness guarantee. However latency might increase due to the aligning of barriers which are required in this processing mode.

For example, while a cluster is running a job, one or more members may leave the cluster due to an internal error, loss of networking, or deliberate shutdown for maintenance. In this case, Hazelcast must suspend the computation, re-plan it for the smaller cluster, and then resume in such a way that the state of computation remains intact.

  • Java

  • SQL

JobConfig.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
CREATE JOB myJob
OPTIONS (
    'processingGuarantee' = 'exactlyOnce',
)
Upholding a processing guarantee requires support from all the participants in the pipeline, including data sources and sinks. For example, to support the at-least-once processing guarantee, all sinks in a pipeline must be idempotent, allowing duplicate submission of the same data item without resulting in duplicate computations. For an in-depth explanation of this topic, see Fault Tolerance. To find out if a source supports processing guarantees, see Ingesting Data from Sources. To find out if a sink supports processing guarantees, see Sending Results to Sinks.

Auto-Scaling Jobs

By default, jobs scale up and down automatically when you add or remove a cluster node. To rescale a job, Hazelcast must restart it.

For an in-depth explanation of fault tolerance for jobs, see Fault Tolerance.

When auto-scaling is off and you add a new node to a cluster, the job will keep running on the previous nodes but not on the new one. However, if the job restarts for whatever reason, Hazelcast will automatically scale it to the whole cluster.

The exact behavior of what happens when a node joins or leaves depends on whether a job is configured with a processing guarantee and with auto-scaling. The table below shows the behavior of a job after a cluster change depending on these two settings.

Auto-Scaling Processing Guarantee Member Added Member Removed

enabled (default)

any setting

restart after a configured delay (scale-up-delay-millis)

restart immediately

disabled

none

keep job running on old members

fail job

disabled

at-least-once or exactly-once

keep job running on old members

suspend job

Storing Metrics

The following settings allow you to configure whether Hazelcast stores metrics such as the time that a job started and completed.

These metrics are not available in SQL. To access these metrics, you must use the Jet API.
Option Description Allowed Value Default Value

metricsEnabled

Enable metrics for the job to be collected by the cluster.

boolean

true

storeMetricsAfterJobCompletion

Enable metrics to be stored in the cluster even after the job completes.

boolean

false