Skip to content

Checkpointing in Structured Streaming

Every micro-batch, Structured Streaming stores its progress through the source and state generated by stateful operators in a durable cloud-storage directory called a checkpoint location. By writing its progress and state to durable cloud storage, failed queries can restart themselves from the last checkpoint as opposed to the very beginning of the source stream.

Checkpoint location contents

The checkpoint location directory stores the following:

  • Stream progress: Each record in a stream typically has an offset number. In the storage location, Structured Streaming records the offsets it is going to process before starting a micro-batch, and marks those offsets are processed when it finishes the micro-batch. Thus, if the stream fails during a micro-batch, the stream recovers by reading from the checkpoint location and resuming at the last unprocessed offset. This aspect of the checkpoint location is known as "progress tracking" or "offset management."
  • Stateful operator state: Stateful operators generate state, such as intermediate aggregation output. This operator state is stored in the checkpoint location so that if the query is restarted, Structured Streaming can load the most recent state without having to rebuild its state by replaying the entire stream over again.

Both progress tracking and state management are central to the fault-tolerance of the Structured Streaming engine, so the default interval at which progress and operator state are written is every micro-batch. Checkpointing cannot be fully disabled, but there are some checkpoint optimizations you can enable to make the effects of checkpoint location operations less expensive.

Select a checkpoint location

Your checkpoint location should be a fixed, per-query directory in cloud-storage (like Amazon S3). For your convenience, it should be somewhat self-describing, so that you know the query to which a given checkpoint location corresponds. For example, if you are in the Data Science division of your company and you are generating a product usage dashboard, you could use s3://data-science/streams/product-usage as the directory location name.

Warning

Your checkpoint locations should always be deterministic: they should be fixed strings, not something like "s3://data-science/{date.today()}/". If you use a non-deterministic string, Structured Streaming will read and write its progress and state to and from non-deterministic locations, which will result in strange issues in your pipeline.

Don't delete your checkpoint location

Finally, once you've set a checkpoint location for a query, you should not manually write to or delete files from that directory. If you were to do this, you could corrupt or delete files needed to resume or recover the stream from failure.

Danger

If you delete your checkpoint location, you'll remove all the progress and state information for the associated streaming query. To recover from such a deletion, your streaming query must reprocess all of your source data. This is time-consuming and costly, and could break the delivery semantics of your query.

However, if you're sure that you no longer need to run a particular query, then you can delete its checkpoint location.

Examples

aggDF \
    .writeStream \
    .outputMode("append") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()
aggDF
    .writeStream
    .outputMode("append")
    .option("checkpointLocation", "path/to/HDFS/dir")
    .format("memory")
    .start()
aggDF
    .writeStream()
    .outputMode("append")
    .option("checkpointLocation", "path/to/HDFS/dir")
    .format("memory")
    .start(); 
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")