Skip to content

Deduplication in Structured Streaming

Data sources frequently have duplicate records. Duplication occurs because many data systems only have at-least-once guarantees, which means that the same record may be present multiple times in the same stream. For example, a web server might be trying to send a log to a database cluster. If that web server has a retry mechanism that isn't idempotent, that record could be produced and written multiple times.

Deduplication methods

Spark has these two deduplication methods:

  • dropDuplicatesWithinWatermark: The dropDuplicatesWithinWatermark method holds onto duplicates for at least as much time as the watermark duration in your streaming job. Deduplication using a watermark is recommended, since records older than the watermark delay are removed, leading to less memory utilization.

  • dropDuplicates: The dropDuplicates method can remove duplicates across your entire stream. This method is used when you want global deduplication. Global deduplication is unbounded by time and requires an unbounded amount of memory.

Warning

Do not use the dropDuplicates method unless you are sure that the columns on which you are deduplicating have low cardinality. Otherwise, you may encounter out of memory errors.

Example

spark = SparkSession. ...

# deduplicate using guid column with watermark based on eventTime column
streamingDf \
    .withWatermark("eventTime", "10 hours") \
    .dropDuplicatesWithinWatermark("guid")
val spark: SparkSession = ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
    .withWatermark("eventTime", "10 hours")
    .dropDuplicatesWithinWatermark("guid")        
SparkSession spark = ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
    .withWatermark("eventTime", "10 hours")
    .dropDuplicatesWithinWatermark("guid");        

Not available in R.

Local deduplication with dropDuplicatesWithinWatermark

The dropDuplicatesWithinWatermark method should be your first choice for deduplication when you know the interval of time within which you might receive duplicates. If you know that you'll have duplicates within an x minute interval, you can instruct the deduplication operator to hold onto records for at least x minutes. After a record has been around for at least x minutes, the deduplication operator will remove it from state, which reduces memory consumption.

For example, suppose you know that you'll have duplicates within 5 minutes of each other. If you receive a record, ID = foo, with event-time 6, you might receive duplicates up to an event-time of 5+6 (11). Watermarks tell Structured Streaming when its done receiving events before a certain time (in this case, time 11). If duplicates for the record ID = foo can arrive up until time 11, once the deduplication operator knows that it'll no longer receive event-times before time 11, it knows to purge that record. That's where the name dropDuplicatesWithinWatermark comes from: for at least as many units of time within your watermark delay, Structured Streaming performs deduplication.

Note

When deduplicating with a watermark, you might have duplicates that arrive after your watermark's maximum delay. In our example, another record with ID = foo that arrives after event-time 11 is not deduplicated. If you have strict deduplication requirements, you have two options:

  • Keep state for longer via a larger watermark delay (a larger watermark means more records arrive within your watermark).
  • Keep state forever, using dropDuplicates.

Global deduplication with dropDuplicates

The dropDuplicates method allows you deduplicate over all records of the stream. Since streams are unbounded, this means that dropDuplicates can deduplicate over an unbounded number of records, by keeping all the records in state. This behavior has pros and cons:

  • Pro: Perfect, total deduplication.
  • Con: Potential out-of-memory errors. If you have enough records, you'll run out of space to store records and cause a machine crash.
Historical note

Before dropDuplicatesWithinWatermark was introduced in Spark 3.5.0, the recommendation was to perform deduplication with state removal by passing an event-time column to dropDuplicates. This still works, but we highly recommend using dropDuplicatesWithinWatermark instead; it's less error prone. dropDuplicates also has an unintuitive and usually undesirable subtlety about that you can check out in this Stack Overflow answer.

Example
spark = SparkSession. ...

# Without watermark using guid column
streamingDf.dropDuplicates("guid")

# With watermark using guid and eventTime columns
streamingDf \
    .withWatermark("eventTime", "10 seconds") \
    .dropDuplicates("guid", "eventTime")   
val spark: SparkSession = ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
    .withWatermark("eventTime", "10 seconds")
    .dropDuplicates("guid", "eventTime")        
SparkSession spark = ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
    .withWatermark("eventTime", "10 seconds")
    .dropDuplicates("guid", "eventTime");        
sparkR.session(...)

# Without watermark using guid column
streamingDf <- dropDuplicates(streamingDf, "guid")

# With watermark using guid and eventTime columns
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")        

API reference

API Reference