Updating Jobs
A streaming job can be running for days or months without disruptions and occasionally it may be necessary to update the job’s pipeline while preserving its state at the same time.
For example, imagine a streaming job which does a windowed aggregation over a 24-hour window. Due to new business requirements, you may need to add an additional 1-hour window and a filter. While implementing this requirement, you don’t want to lose the current computational state because it includes data for the past day and in many cases we may not be able to replay this data fully.
To update a job, you need to do the following:
-
Update the pipeline with your new workflow.
-
Export the state of your existing job into a snapshot.
-
Start you new job, using the saved state.
Updating the Pipeline
The first step in updating a job is to update its pipeline.
The new pipeline must be able to work with the previous snapshot state because once the job is started again from a new snapshot, the following rules are applied:
-
If a transform was not in the previous version and is available in the new version, the transform will be restored with an empty state.
-
If a transform was in the previous version and is not in the new version, then its state will simply be ignored.
-
If the transform existed in the previous version and also exists in the new version, then the state from the previous version will be restored as the state of the new transform.
Stateful transforms in the original and the updated pipelines must have the same name for the state to be restored successfully.
What you Can Do
You can make any of the following changes and still keep your pipeline compatible with the saved state:
-
Adding new stateful stages and removing existing ones. This includes adding and removing a source or sink or adding a new aggregation path from existing sources.
-
Adding, removing or changing stateless stages, such as
filter
,map
,flatMap
, non-transactional sinks, and more. -
Adding new branches to existing pipeline stages.
-
Changing the connection parameters of sources and sinks.
-
Enabling and disabling early results for a window.
-
Changing the timeout of a session window.
When you change the session window timeout, you may lose events that were emitted during the update. Hazelcast does not split already aggregated events into different accumulators. However existing pre-aggregated values will be put into frames based on their end time. After all the data based on events that were processed before the update are emitted, the results will be correct again. -
Changing the window size or the slide length of sliding windows or tumbling windows.
Hazelcast purges data used in windows that have already been emitted. As a result, your window may miss some data during the update because Hazelcast will need data that has already been purged. When you change the window size or slide step, you may lose events that were emitted during the update. Hazelcast does not split already aggregated events into different accumulators. However existing pre-aggregated values will be put into frames based on their end time. After all the data based on events that were processed before the update are emitted, the results will be correct again. -
Changing the eviction timeout for stateful map.
-
Changing the parameters of aggregate operations, such as changing the comparator of the
AggregateOperation.minBy()
method. -
Changing the aggregate operation.
The accumulator type must stay the same. -
Renaming a stateless stage.
-
Updating an associated User Code Namespace
All jobs using a specific namespace should be suspended while updating the namespace’s resources, otherwise jobs may become inconsistent due to differing namespace versions in use. Take care to ensure that the updated resources are compatible with any existing data used by the jobs.
Exporting a Snapshot
To update a job, the first thing we need to do is to take a snapshot of its current state and cancel it.
bin/hz-cli save-snapshot -C <job name or id> <snapshot name>
This will take an in-memory snapshot of the job state and cancel it. The job will not process any data after the snapshot was taken and is cleanly taken down. You can see a list of current exported snapshots in the cluster with the following command:
bin/hz-cli list-snapshots
Example result:
TIME SIZE (bytes) JOB NAME SNAPSHOT NAME
2020-03-15T14:37:01.011 1,196 hello-world snapshot-v1
DROP JOB IF EXISTS <job name or id> WITH SNAPSHOT <snapshot name>;
Result:
OK
This will take an in-memory snapshot of the job state and cancel it. The job will not process any data after the snapshot was taken and is cleanly taken down.
You cannot list the current exported snapshots in SQL.
For more details about this statement, see the SQL reference documentation.
Internally, Hazelcast stores these snapshots in maps that are separate from the periodic snapshots that are taken as part of the job execution. Exporting snapshots requires enough available memory in the cluster to store the computation state. |
Starting the Updated Job
When submitting a job, you can specify an initial snapshot to use. The job will then start from the processing state that was restored from the specified snapshot and as long as state compatibility is maintained, it will continue running once the snapshot is restored. To submit a job starting from a specific snapshot you can use the following command:
Saving Snapshots on Disk
If your cluster shuts down for any reason, any saved snapshots will be lost. Snapshots are saved in memory and are not persisted to disk by default. To protect your snapshots from member failures, enable your cluster to persist snapshots. See Persisting Data on Disk.