Looking at these mechanics will help us motivate, understand, and apply the concepts around watermarks. We discuss how watermarks are created at the point of data ingress, how they propagate through data processing pipeline, and how they affect output timestamps.
Definition
We wish to solve the general problem of when it is safe to call an event-time window closed. We require a more robust measure of progress. To arrive there, we make one fundamental assumption about our streaming data: each message has an associated logical event timestamp. In most case, we can take the time of the original event’s occurrence as its logical event timestamp.
Each message is eight “in-fight”, meaning that it has been received but not yet completed, or “completed”, meaning that no more processing on behalf of this message is required.
We use the leftmost edge of the in-flight distribution to define watermark:
The watermark a monotonically increasing timestamp of the oldest work not yet completed.
There are two fundamental properties make it useful:
- Completeness: We can correctly emit any aggregations at or before T
- Visibility: If a message is stuck in our pipeline for any reason, the watermark cannot advance.
Source Watermark Creation
- Perfect watermark creation: Perfect watermark creation requires perfect knowledge of the input:
- Ingress timestamping: A source that assigns ingress times as the event times for data entering the system can create a perfect watermark (System observe time)
- Static sets of time-ordered logs: A statically sized input source of time-ordered logs (e.g., Kafka topic with a static set of partitions, where each partition of the source contains monotonically increasing event times)
- Heuristic watermark creation: Merely estimate, pipelines need to deal with late data
- Dynamic sets of time-ordered logs: Consider a dynamic set of structured log files (each individual file containing records with monotonically increasing event times relative to other records in the same file but with no fixed relationship of event times between files).
- By tracking the minimum event times of unprocessed data in the existing set of log files, monitoring growth rates, and utilizing external information like network lag, you can create a watermark.
The more that is known about the source, the better the heuristic, and the fewer late data items will be seen.
Watermark Propagation
Most real-world pipelines consist of multiple stages. For pipelines comprising multiple distinct stages, each stage likely tracks its own watermark, whose value is a function of all the inputs and stages that come before it.
- The input watermark: Capture the progress of upstream. For nonsource stages, the input watermark is defined as the minimum of the output watermarks of all shards/partitions/instances of all of its upstream sources and stages
- The output watermark: Capture the progress of itself and is essentially defined as the minimum of the stage’s input watermark and the event times of all nonlate data active messages within the stage.
We can calculate the event-time latency within a stage by diff input and output watermark.
Understanding Watermark Propagation
- The output watermark for each of the stages is at least as old as the corresponding input watermark of each, and in reality a little bit older. This is because in a real system computing answers takes time, and we don’t allow the output watermark to advance until processing for a given input has completed.
- The input watermark for the Average Session Lengths stage is the minimum of the output watermarks for the two stages directly upstream.
Watermarks are never allowed to move backward. Output timestamps have a few choices:
- End of the window: if you want the output timestamp to be representative of the window bounds.
- Timestamp of first nonlate element: When you want to keep your watermarks as conservative as possible.
- Timestamp of a specific time: some time has business meaning like user click the system.
Example: Timestamp of end of windows
Example: Timestamp of first nonlast element in window
Two are worth noting:
- Watermark delay
- Semantic differences: There is nothing right or wrong for three kinds of watermarks. Just make the right choice for your use case.
Percentile Watermarks
We could consider the entire distribution of event timestamps for active messages and make use of it to create finer-grained triggering conditions.
If for the business logic “mostly” correct is sufficient, percentile watermarks provide a mechanism by which the watermark can advance more quickly and more smoothly.
Processing-time Watermarks
By only examining the event-time watermark as we have defined it up until now, we cannot distinguish between a system that is processing data from an hour ago quickly and without delay, and a system that is attempting to process real-time data and has been delayed for an hour while doing so.
We define the processing-time watermark in the exact same way as we have defined the event-time watermark, except instead of using the event-time timestamp of oldest work not yet completed, we use the processing-time timestamp of the oldest operation not yet completed.
Processing time watermark is small delay, but event-time watermark is large delay. It means the data is buffered.
If both processing time watermark and event-time watermark is large delay. It means our operation is blocked. Event-time watermark can’t provide enough information.
Case Study
Google Cloud Dataflow performs aggregation via a centralized aggregator agent. The watermark update protocol must take state ownership lease validation into account.
Flink performs watermark tracking and aggregation in-band. These sources also both generate watermark “checkpoints” that are sent synchronously in-band with the data stream. This means that when a watermark checkpoint from source A for timestamp “53” is emitted, it guarantees that no nonlate data messages will be emitted from source A with timestamp behind “53”.
The in-band watermarks advantages:
- Reduced watermark propagation latency, and very low-latency watermarks (no central aggregation delay)
- No single point of failure for watermark aggregation
- Inherent scalability
Some on-band watermarks advantages: - Single source of “truth”
- Source watermark creation: some source watermarks require global information. For example, when source watermark stopped because of no data, then some agent should produce watermark to downstream.
To build up a real-world streaming pipeline, procedures:
- Test the system and get a bounded amount of out-of-order timestamps on the source data.
- Create two subscriptions to the topic containing the input messages: a base subscription that the pipeline will actually use to read the data to be processed, and a tracking subscription, which is used for tracking the oldest unacknowledged processing time only, to perform the watermark estimation.
• The tracking subscription is sufficiently ahead of the base subscription. Sufficiently ahead means that the tracking subscription is ahead by at least the estimation band. This ensures that any bounded reorder within the estimation band is taken into account.
• The tracking subscription is sufficiently close to real time. In other words, there is no backlog on the tracking subscription.