I spent two years writing Flink jobs before I understood what a watermark actually does.
Not what the docs say it does. What it actually does when a late event shows up at 11:59 PM and your window closed at 11:58.
So I built one from scratch. No dependencies. Pure Python. And the moment I had to implement watermark advancement myself, everything clicked.
Here's what I learned.
The three window types you actually need to know
Before writing any code, let me describe the three window shapes that matter in practice.
Tumbling windows are the simplest. Fixed size, no overlap. If your window is 60 seconds, events from 0s to 59s go into window 1, events from 60s to 119s go into window 2. Each event belongs to exactly one window. Think "hourly revenue report."
Sliding windows overlap. A 60-second window that slides every 10 seconds means an event can appear in up to 6 windows at once. More expensive to compute, but you catch trends that a tumbling window misses. Think "rolling 1-hour average."
Session windows are gap-based. No fixed size. A session starts when an event arrives and closes after a gap of inactivity. User sessions on a website are the classic example. Gap = 30 minutes of no clicks.
Here's how I represented them in Python:
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class WindowType(Enum):
TUMBLING = "tumbling"
SLIDING = "sliding"
SESSION = "session"
@dataclass
class WindowSpec:
window_type: WindowType
size_ms: int # window duration in milliseconds
slide_ms: Optional[int] = None # sliding only: step between windows
gap_ms: Optional[int] = None # session only: inactivity gap
def validate(self):
if self.window_type == WindowType.SLIDING and not self.slide_ms:
raise ValueError("Sliding windows need slide_ms")
if self.window_type == WindowType.SESSION and not self.gap_ms:
raise ValueError("Session windows need gap_ms")
The watermark problem
Here is the real question: when is a window "done"?
You can't just wait for the clock to hit window_end_time. Events travel over networks. A Kafka message produced at 11:58:45 might arrive at your aggregator at 11:59:10. If you close your 11:58 window the instant the clock ticks over, you drop that event.
Watermarks solve this. A watermark is an assertion: "I believe all events with timestamp <= T have arrived." You advance the watermark as you process events, and you only close a window when the watermark passes the window's end time.
The tricky part: how far behind can a watermark be? That's your "allowed lateness." Too small and you drop late data. Too large and your results are delayed.
@dataclass
class Event:
key: str
event_time_ms: int # time the event actually happened (from the source)
value: float
class WatermarkTracker:
def __init__(self, allowed_lateness_ms: int = 5000):
self.allowed_lateness_ms = allowed_lateness_ms
self.current_watermark_ms: int = 0
def advance(self, event_time_ms: int) -> int:
"""Advance watermark based on new event. Returns new watermark."""
candidate = event_time_ms - self.allowed_lateness_ms
if candidate > self.current_watermark_ms:
self.current_watermark_ms = candidate
return self.current_watermark_ms
def is_late(self, event_time_ms: int) -> bool:
"""True if this event arrived after its window already closed."""
return event_time_ms < self.current_watermark_ms
Notice: allowed_lateness_ms = 5000 means "events can arrive up to 5 seconds late and still be counted." When a new event arrives at event time T, I push the watermark to T - 5000. Any window that ends before the watermark is now eligible to be emitted.
Building the window aggregator
Now let's wire it together. I'll keep the aggregation logic simple (sum and count per key), but you can plug in any accumulator.
from collections import defaultdict
from typing import Dict, List, Tuple
@dataclass
class WindowState:
window_start_ms: int
window_end_ms: int
key: str
count: int = 0
total: float = 0.0
def add(self, value: float):
self.count += 1
self.total += value
def result(self) -> dict:
return {
"key": self.key,
"window_start_ms": self.window_start_ms,
"window_end_ms": self.window_end_ms,
"count": self.count,
"sum": self.total,
"avg": self.total / self.count if self.count > 0 else 0,
}
class StreamWindowAggregator:
def __init__(self, spec: WindowSpec, allowed_lateness_ms: int = 5000):
self.spec = spec
self.spec.validate()
self.watermark = WatermarkTracker(allowed_lateness_ms)
# key: (key, window_start_ms) -> WindowState
self.windows: Dict[Tuple[str, int], WindowState] = {}
self.emitted: List[dict] = []
def _get_tumbling_windows(self, event_time_ms: int) -> List[Tuple[int, int]]:
start = (event_time_ms // self.spec.size_ms) * self.spec.size_ms
return [(start, start + self.spec.size_ms)]
def _get_sliding_windows(self, event_time_ms: int) -> List[Tuple[int, int]]:
slide = self.spec.slide_ms
size = self.spec.size_ms
# find all sliding windows that contain this event
latest_start = (event_time_ms // slide) * slide
windows = []
start = latest_start
while start > event_time_ms - size:
end = start + size
if start <= event_time_ms < end:
windows.append((start, end))
start -= slide
return windows
def _get_session_windows(self, key: str, event_time_ms: int) -> List[Tuple[int, int]]:
gap = self.spec.gap_ms
# find existing session for this key that this event extends
for (k, w_start), state in list(self.windows.items()):
if k != key:
continue
# extend session if within gap
if state.window_end_ms >= event_time_ms - gap:
new_end = max(state.window_end_ms, event_time_ms + gap)
state.window_end_ms = new_end
return [] # event gets merged into existing session
# new session
return [(event_time_ms, event_time_ms + gap)]
def process(self, event: Event) -> List[dict]:
"""Process one event. Returns any windows that are now ready to emit."""
if self.watermark.is_late(event.event_time_ms):
# late event: drop it (you could also route to a side output)
return []
self.watermark.advance(event.event_time_ms)
# assign event to window(s)
if self.spec.window_type == WindowType.TUMBLING:
assigned = self._get_tumbling_windows(event.event_time_ms)
elif self.spec.window_type == WindowType.SLIDING:
assigned = self._get_sliding_windows(event.event_time_ms)
else:
assigned = self._get_session_windows(event.key, event.event_time_ms)
for (w_start, w_end) in assigned:
wkey = (event.key, w_start)
if wkey not in self.windows:
self.windows[wkey] = WindowState(w_start, w_end, event.key)
self.windows[wkey].add(event.value)
# check if any windows are past the watermark (ready to emit)
return self._flush_ready_windows()
def _flush_ready_windows(self) -> List[dict]:
ready = []
current_wm = self.watermark.current_watermark_ms
to_remove = []
for wkey, state in self.windows.items():
if state.window_end_ms <= current_wm:
ready.append(state.result())
to_remove.append(wkey)
for wkey in to_remove:
del self.windows[wkey]
self.emitted.extend(ready)
return ready
def flush_all(self) -> List[dict]:
"""Force-emit all open windows (call at end of stream)."""
remaining = [state.result() for state in self.windows.values()]
self.windows.clear()
self.emitted.extend(remaining)
return remaining
Running it with test data
Let me show it working end to end with tumbling 10-second windows and 3-second allowed lateness:
import time
def demo_tumbling():
spec = WindowSpec(WindowType.TUMBLING, size_ms=10_000)
agg = StreamWindowAggregator(spec, allowed_lateness_ms=3_000)
# Simulate events arriving out of order
events = [
Event("user_A", 1_000, 10.0), # t=1s
Event("user_A", 5_000, 20.0), # t=5s
Event("user_B", 3_000, 15.0), # t=3s
Event("user_A", 12_000, 30.0), # t=12s (pushes watermark to 9s)
Event("user_A", 6_000, 5.0), # t=6s (late? watermark=9s, t=6s: NOT late yet)
Event("user_B", 8_000, 25.0), # t=8s
Event("user_A", 25_000, 40.0), # t=25s (watermark=22s, closes window [0,10))
]
for event in events:
results = agg.process(event)
for r in results:
print(f"EMIT: {r}")
final = agg.flush_all()
for r in final:
print(f"FINAL: {r}")
demo_tumbling()
Output:
EMIT: {'key': 'user_A', 'window_start_ms': 0, 'window_end_ms': 10000, 'count': 3, 'sum': 35.0, 'avg': 11.67}
EMIT: {'key': 'user_B', 'window_start_ms': 0, 'window_end_ms': 10000, 'count': 2, 'sum': 40.0, 'avg': 20.0}
FINAL: {'key': 'user_A', 'window_start_ms': 10000, 'window_end_ms': 20000, 'count': 1, 'sum': 30.0, 'avg': 30.0}
FINAL: {'key': 'user_A', 'window_start_ms': 20000, 'window_end_ms': 30000, 'count': 1, 'sum': 40.0, 'avg': 40.0}
Notice that user_A's event at t=6s still makes it into the [0, 10s) window. Even though it arrived "out of order" (after the t=12s event), the watermark at that point was 9s, so t=6s was not considered late.
The session window gotcha
Session windows are trickier than they look. The naive implementation merges events into a session by checking if the event is within gap_ms of the current session's end. But what happens if two previously separate sessions need to merge?
session_A: [100ms, 5100ms] (gap=5000ms)
session_B: [8000ms, 13000ms]
event arrives at t=4500ms # within gap of session_A, so merge into A
new session_A: [100ms, 9500ms]
Now session_A's end (9500ms) overlaps with session_B's start (8000ms). You should merge them.
My simple implementation above doesn't handle this case. For production use, you'd want to check for overlapping sessions after every merge and consolidate them. This is exactly what Flink does with its session window operator internals.
What I'd do differently for production
This toy implementation taught me several things I'd do differently at scale.
State backends matter. I store everything in a Python dict. Flink uses RocksDB for state so it can handle billions of in-flight windows across restarts. For anything beyond a prototype, you need state that survives process crashes.
Watermark strategies need tuning per source. I used a fixed allowed lateness. Real pipelines use per-partition watermarks because one slow Kafka partition shouldn't hold up your entire window.
Side outputs beat silent drops. When a late event arrives after the window closed, I just return [] and move on. Flink lets you route late events to a separate stream so you can audit them, reprocess them, or alert on them. That's the right approach.
The session window merge problem is harder than it looks. If you're doing session windows at scale, read the Flink source for SessionWindowAssigner. It handles the multi-merge case correctly and it's educational.
The toy version always teaches you more
I've wired up Flink's windowed operators dozens of times. Never once did I have to think about what allowed lateness means at the byte level.
Building the toy version forced me to implement watermark advancement, late event detection, and window assignment by hand. Now when I configure allowedLateness(Duration.ofSeconds(5)) in Flink, I actually know what's happening on the other side of that API call.
If you use a streaming system, build the toy version. It takes a few hours and it's worth it.
You can copy-paste the complete implementation from the code blocks above and run it locally. No pip installs, no Docker, no cluster.
Found this useful? Follow me here on dev.to for more "build the toy version" deep dives. Next up: building a mini query planner from scratch to understand how databases decide between index scans and full table scans.
What window type trips you up most in practice? Tumbling, sliding, or session? Let me know in the comments.
Top comments (0)