1. 程式人生 > >Watermarking in Spark Structured Streaming

Watermarking in Spark Structured Streaming

What is a Watermark?

Watermarking is a useful method which helps a Stream Processing Engine to deal with lateness. Basically, a watermark is a threshold to specify how long the system waits for late events. If an arriving event lies within the watermark, it gets used to update a query. Otherwise, if it’s older than the watermark, it will be dropped and not further processed by the Streaming Engine

.

Flooding watermarks

But, Why Should I Care?

In distributed and networked systems, there’s always a chance for disruption — nodes going down, sensors are loosing connection and so on and so forth. Because of that, it’s not guaranteed that data will arrive in a Stream Processing Engine in the order they were created. For the sake of fault tolerance

it’s therefore necessary to handle such Out-of-Order data.

To deal with this problem, the state of an aggregate must be preserved. In case a late event occurs, the query can then be reprocessed. But this means that the state of all aggregates must kept indefinitely, which causes the memory usage to grow to indefinitely too. And that is not practical in a real world scenario, unless the system has unlimited resources (resp. an unlimited budget). Therefore watermarking

is a useful concept to constrain the system by design and prevent it from exploding at runtime.

How to use it?

Since Spark 2.1, watermarking is introduced into Structured Streaming API. You can enable it by simply adding the withWatermark-Operator to a query:

withWatermark(eventTime: String, delayThreshold: String): Dataset[T]

It takes two Parameters, a) an event time column (must be the same as the aggregate is working on) and b) a threshold to specify for how long late data should be processed (in event time unit). The state of an aggregate will then be maintained by Spark until max eventTime — delayThreshold > T , where max eventTime is the latest event time seen by the engine and T is the starting time of a window. If late data fall within this threshold, the query gets updated eventually (right image in the figure below). Otherwise it gets dropped and no reprocessing is triggered (left image in figure below).