This is a prerelease version.

View latest

Updating Jobs

A streaming job can be running for days or months without disruptions and occasionally it may be necessary to update the job’s pipeline while preserving its state at the same time.

For example, imagine a streaming job which does a windowed aggregation over a 24-hour window. Due to new business requirements, you may need to add an additional 1-hour window and a filter. While implementing this requirement, you don’t want to lose the current computational state because it includes data for the past day and in many cases we may not be able to replay this data fully.

To update a job, you need to do the following:

  1. Update the pipeline with your new workflow.

  2. Export the state of your existing job into a snapshot.

  3. Start you new job, using the saved state.

Updating the Pipeline

The first step in updating a job is to update its pipeline.

The new pipeline must be able to work with the previous snapshot state because once the job is started again from a new snapshot, the following rules are applied:

  • If a transform was not in the previous version and is available in the new version, the transform will be restored with an empty state.

  • If a transform was in the previous version and is not in the new version, then its state will simply be ignored.

  • If the transform existed in the previous version and also exists in the new version, then the state from the previous version will be restored as the state of the new transform.

    Stateful transforms in the original and the updated pipelines must have the same name for the state to be restored successfully.

What you Can Do

You can make any of the following changes and still keep your pipeline compatible with the saved state:

  • Adding new stateful stages and removing existing ones. This includes adding and removing a source or sink or adding a new aggregation path from existing sources.

  • Adding, removing or changing stateless stages, such as filter, map, flatMap, non-transactional sinks, and more.

  • Adding new branches to existing pipeline stages.

  • Changing the connection parameters of sources and sinks.

  • Enabling and disabling early results for a window.

  • Changing the timeout of a session window.

    When you change the session window timeout, you may lose events that were emitted during the update. Hazelcast does not split already aggregated events into different accumulators. However existing pre-aggregated values will be put into frames based on their end time. After all the data based on events that were processed before the update are emitted, the results will be correct again.
  • Changing the window size or the slide length of sliding windows or tumbling windows.

    Hazelcast purges data used in windows that have already been emitted. As a result, your window may miss some data during the update because Hazelcast will need data that has already been purged.
    When you change the window size or slide step, you may lose events that were emitted during the update. Hazelcast does not split already aggregated events into different accumulators. However existing pre-aggregated values will be put into frames based on their end time. After all the data based on events that were processed before the update are emitted, the results will be correct again.
  • Changing the eviction timeout for stateful map.

  • Changing the parameters of aggregate operations, such as changing the comparator of the AggregateOperation.minBy() method.

  • Changing the aggregate operation.

    The accumulator type must stay the same.
  • Renaming a stateless stage.

  • Updating an associated User Code Namespace

    All jobs using a specific namespace should be suspended while updating the namespace’s resources, otherwise jobs may become inconsistent due to differing namespace versions in use. Take care to ensure that the updated resources are compatible with any existing data used by the jobs.

What you Cannot Do

Making any of these changes will make your pipeline incompatible with the saved state:

  • Changing a sliding window to a session window.

  • Replacing an aggregation operation for another one with a different accumulator.

  • Renaming a stateful stage.

Exporting a Snapshot

To update a job, the first thing we need to do is to take a snapshot of its current state and cancel it.

  • CLI

  • SQL

bin/hz-cli save-snapshot -C <job name or id> <snapshot name>

This will take an in-memory snapshot of the job state and cancel it. The job will not process any data after the snapshot was taken and is cleanly taken down. You can see a list of current exported snapshots in the cluster with the following command:

bin/hz-cli list-snapshots

Example result:

TIME                    SIZE (bytes)    JOB NAME                 SNAPSHOT NAME
2020-03-15T14:37:01.011 1,196           hello-world              snapshot-v1
DROP JOB IF EXISTS <job name or id> WITH SNAPSHOT <snapshot name>;

Result:

OK

This will take an in-memory snapshot of the job state and cancel it. The job will not process any data after the snapshot was taken and is cleanly taken down.

You cannot list the current exported snapshots in SQL.

For more details about this statement, see the SQL reference documentation.

Internally, Hazelcast stores these snapshots in maps that are separate from the periodic snapshots that are taken as part of the job execution. Exporting snapshots requires enough available memory in the cluster to store the computation state.

Starting the Updated Job

When submitting a job, you can specify an initial snapshot to use. The job will then start from the processing state that was restored from the specified snapshot and as long as state compatibility is maintained, it will continue running once the snapshot is restored. To submit a job starting from a specific snapshot you can use the following command:

  • CLI

bin/hz-cli submit -s <snapshot name> <jar name>

Saving Snapshots on Disk

If your cluster shuts down for any reason, any saved snapshots will be lost. Snapshots are saved in memory and are not persisted to disk by default. To protect your snapshots from member failures, enable your cluster to persist snapshots. See Persisting Data on Disk.