Skip to content

Watermarks in Structured Streaming

In Structured Streaming, having out of order data is normal and expected. For the streaming engine and stateful operators to produce complete, correct, and repeatable results, there must be a mechanism for determining when the stream won't receive any more events before a certain time (it needs to know, "I've received everything from before 4pm").

The name for such a timestamp is called a watermark. The engine computes the watermark at the end of each micro-batch by subtracting a user-provided delay value (called the watermark delay) from the maximum timestamp on the records seen so far. Usually, the term "event-time" is used to refer to the timestamp embedded within records themselves; the term "processing-time" usually refers to the time at which a record is processed. To make sure that all out-of-order records are processed, the watermark delay should be the maximum delay between event-time and processing-time.

Why subtract the watermark delay from the maximum event-time seen so far?

Suppose that we receive a record bob generated at timestamp 60, and the maximum delay that a record can have is 20 seconds.

Hypothetically, consider a record generated at timestamp 39. Even if it were maximally delayed (by 20 seconds), it would have arrived at or before timestamp 59:

So, even before record 60 was generated (let alone make it over the network to the streaming system), the record generated at timestamp 39 must have arrived. More generally, anything less than the maximum event-time seen so far minus the maximum delay must have already arrived:

In this picture, the name for the timestamp 40 is called the stream's current watermark. It designates that no more records before time 40 will be received. For the rest of this article, we'll usually depict the stream above as:

Of course, if your watermark delay isn't large enough, you may receive records less than your watermark. In that case, Structured Streaming will drop those records.

Let's see how this is useful. Suppose you define an aggregation operator that aggregates data for non-overlapping 5 minute windows and the watermark delay is 1 minute. If the largest event time processed in a micro-batch is 4:02 PM and the watermark delay is 1 minute, then we should have received all events before 4:01pm. That becomes our watermark, and it tells the aggregation operator that all data before 4:01pm has been received. Then, the 3:55pm to 4:00pm window could never receive new records, and the aggregation operator could safely emit that window's aggregate value downstream.

Setting a watermark delay

To tell your stream how to calculate the watermark, you'll use the withWatermark method on DataFrames. The first argument is the name of the timestamp column on your incoming records, and the second argument is the watermark delay, which can be a duration like "1 minute" or "5 seconds" or "2 hours".

If your incoming stream doesn't have a field that is explicitly of TimestampType, you'll have to use conversion functions like timestamp_seconds and timestamp_millis.

PySpark Reference

df = spark.readStream.format("...").load()

df = (df
    .withWatermark("timestamp", "1 minute")
    # Use other stateful operators here
)

df.writeStream.format("...").start()

Scala Reference

val df = spark.readStream.format("...").load()

val dfWithWatermark = df
  .withWatermark("timestamp", "1 minute")
  // Use other stateful operators here

dfWithWatermark.writeStream.format("...").start()

Java Reference

Dataset<Row> df = spark.readStream().format("...").load();

Dataset<Row> dfWithWatermark = df
  .withWatermark("timestamp", "1 minute")
  // Use other stateful operators here

dfWithWatermark.writeStream().format("...").start();

The three principles of watermarks

There are three basic principles of watermarks in Structured Streaming. You can use the helpful pneumonic "WET" to remember them.

Principle 1

Watermarks Walk the boundary between event-times the stream won't receive and event-times the stream will receive. For example, a stream's watermark being at 40 seconds tells the engine that it won't have to process any more events with timestamps less than 40 seconds.

Principle 2

Watermarks Evaluate at the end of each micro-batch. This principle has implications for completeness and latency, so we'll explore it more shortly.

Principle 3

Watermarks Trail behind the maximum event-time seen so far by the watermark delay. That is, the watermark is computed by subtracting the watermark delay from the largest event-time seen so far. (See the explanation in the introduction for why this makes sense.)

Conceptual watermark example

Let's assume a watermark delay defined as 20 seconds. In our first batch, suppose that we receive the following records:

As per Principle 2, that watermarks evalute at the end of every micro-batch, we now have to compute the watermark. Using Principle 3, that watermarks trail behind the maximum event-time by the watermark delay, we subtract 20 from 55 to give us 35:

Now, suppose that we receive the record (d, 10) in the next micro-batch. As per Principle 1, the record (d, 10) is discarded as its timestamp is less than 35, the watermark value. The only reason this could happen is if the watermark delay of 20 wasn't the maximum delay. If the maximum delay was something like 50, then the watermark would have been 55 - 50 = 5, and (d, 10) would not have been dropped.

Finally, let's consider what happens when we receive (e, 95) and then (f, 70) in the same micro-batch.

You might think that once processing (e, 95), the watermark updates to 95 - 20 and that (f, 70) is dropped. But that's not the case: we must stick to our principles. Principle 2 tells us that the watermark evaluates at the end of every micro-batch. Thus, we process both e and f, and then we go to update the watermark. We can then use Principle 3, that watermarks trail behind the maximum event-time seen so far by the watermark delay, to update the watermark to 95 - 20 = 75:

So what's the takeaway? You generally will never have to employ these rules, but it's good to know that these principles can effect latency and correctness. We'll discuss this tradeoff in the following section.

The tradeoff between completeness and latency

Watermark delays have the ability to configure the tradeoff between latency and completeness in your stream. Let's understand this claim by considering the three principles:

  1. Principle 1 dictates that watermarks are the boundary between event-times you will and won't receive. So, if you receive records that are less than the current watermark, they will be dropped. Dropping records means that your stream will not have a "complete" view of all records.
  2. Principle 2 tells us that watermarks evaluate at the end of each micro-batch. This means that if you don't run any micro-batches, then your watermark won't update. This will mean that stateful operators won't be able to emit results, and your stream will have high latency.
  3. Principle 3 states that watermarks trail behind the maximum event-time seen so far by the watermark delay. This means that if your watermark delay is too small, you might drop records that you shouldn't have. If your watermark delay is too large, you might have to wait longer to emit results.

Practically, what does this mean for you? Usually, you'll have SLAs on how delayed your data can be. If you want more correctness than lower latency, you should set your waterkmark delay to be near the 100th percentile of the delay that your SLA gives you. If you want lower latency and are fine with dropping some records, you can set your watermark delay to be lower.

Monitoring and tuning your watermark delay

In the Monitoring the Query Lifecycle guide, we discuss how to register a listener that receives StreamingQueryProgress events. In the streaming query progress, you have access to the StateOperatorProgress, which contains information on the number of dropped rows (numRowsDroppedByWatermark). In your listener, you can monitor the number of dropped rows and stop your stream (or send an alert) if the number of dropped rows exceeds a certain threshold.

If you notice that the number of records dropped by the watermark is 0, two things are possible: your watermark delay could be too large, or it could actually be "just right." Unfortunately, it's difficult to automatically assess which situation you're in. You could try to reduce the watermark delay and see if the number of dropped rows increases. However, this isn't quite practical for production jobs, especially if they're dealing with sensitive data that you don't want to drop.

Example of watermarks and aggregations

To see a real, runnable example of a streaming aggregation and its interaction with watermarks, please read the aggregation with watermark example.