Most fraud detection systems get the big events right. A single transaction over $10,000 triggers an alert. One login from an unusual country raises a flag. These are threshold rules, simple, stateless, and effective for obvious anomalies.
But some of the most damaging patterns aren't loud. They're fast.
Ten clicks per user in ten seconds. A hundred API calls within half a minute. Purchase values that individually look unremarkable but collectively breach a limit in sixty seconds. These are velocity patterns, and catching them requires the stream processor to remember what just happened, aggregate it on the fly, and decide whether this moment, combined with the last few seconds of history, crosses a line.
That's a fundamentally different problem. It's stateful, windowed, and time-sensitive.
In Part 1, we built the rules engine foundation: threshold evaluation and hot-reloadable rules via Flink's Broadcast State. In this post, we go deeper into the velocity pattern, how we define it, how we implement it using PyFlink's KeyedProcessFunction and ListState, and how we route velocity-triggered events into a correlation pipeline without relying on side outputs (a deliberate tradeoff for AWS compatibility).
By the end, you'll have a working pattern for:
- Sliding window aggregation per key (count, sum, avg, min, max, distinct_count)
- Optional global aggregation when no
group_byis needed - Stream-based routing to a downstream correlation stage via synthetic topic tagging
Flink Concepts Used In This Post
|
Concept |
Details |
Official Reference |
|
key_by(key_selector) |
Partitions the stream by a key. Every record with the same key is processed by the same parallel task. Required for keyed state and KeyedProcessFunction. |
|
|
KeyedProcessFunction |
A process function that runs per key. You implement |
|
|
ListState |
Keyed state that holds a list of elements. You can add, iterate, and clear. We use it to store the sliding window of (timestamp, value, event_json) per key. |
|
|
ValueState |
Keyed state that holds a single value per key. We use it for “last emitted aggregation” (to avoid duplicate emits) and for cleanup timers in event_time mode. |
|
|
ListStateDescriptor / ValueStateDescriptor |
Describe the type and name of state. Created once; you get the actual state in |
Same as above |
|
Timer service |
In a KeyedProcessFunction you can register a timer with |
|
|
filter / map (Stream Splitting) |
Instead of side outputs, we use a single output stream where each record carries routing metadata. We then use |
1. Velocity Rule Schema
Velocity rules share the common rule fields (rule_id, version, rule_type, source_topic, etc.). The following fields are required:
|
Field |
Type |
Description |
|---|---|---|
|
|
number |
Size of the time window (e.g. |
|
|
string |
One of |
|
|
string |
How to aggregate: |
|
|
number |
Emit when the aggregation result meets this (e.g. |
Optional fields (commonly used):
|
Field |
Default |
Description |
|---|---|---|
|
|
— |
Field to group by (e.g. |
|
|
— |
Required for |
|
|
|
Conditions to filter events before they enter the window (same format as threshold). |
|
|
last_event |
|
|
|
processing_time |
|
|
|
— |
Event field for timestamp when |
|
|
true |
If false, the event bypasses the main sink and is tagged with a routing marker to feed the correlation pipeline. |
Aggregation types:
|
aggregation_type |
Requires aggregation_field |
Example |
|---|---|---|
|
|
No |
Number of events in window. |
|
|
Yes |
Sum of a value field in window. |
|
|
Yes |
Average of a numeric field. |
|
|
Yes |
Min or max value in window. |
|
|
Yes |
Count of distinct values of a field. |
2. Example Velocity Rules
Example 1: Count Per User (With group_by)
"Emit when a user has more than 10 clicks in 10 seconds."
- Key:
(rule_id, user_id), each user has their own independent window. - State: A list of
(timestamp, value, event_json)for the last 10 seconds. Older entries are evicted, and whencount ≥ 10, the last event is emitted (ifemit_mode: last_event).
Example 2: No group_by (Global Window)
"Emit when more than 100 events occur in 10 seconds across all users."
- Key: Just
rule_id. All events for this rule are routed to the same key, producing a single sliding window for the entire stream.
Example 3: Sum aggregation
"Emit when a user's total purchase value in 60 seconds exceeds $1000."
- Only events with
type == "purchase"(viaconditions) enter the window. Thevaluefield is summed across a 60-second per-user window; whensum ≥ 1000, the rule fires.
3. Watermarks (event_time And Late Data)
When time_mode is event_time, the engine uses the event's timestamp (from timestamp_field) to determine window membership and cleanup timing. Out-of-order or slightly late events are handled using Flink's watermarks.
What a watermark is: A watermark is a signal in the stream that "no event with timestamp earlier than T will arrive." Flink uses it to advance event time and to fire event-time timers. When we register a cleanup timer for a window boundary, that timer fires when the watermark passes that time, allowing safe eviction of old entries from state.
How the job configures it: The job assigns event timestamps using a TimestampAssigner that reads the event payload (e.g. event_ts or timestamp) and converts to milliseconds. It then uses WatermarkStrategy.for_bounded_out_of_orderness(delay) so that events up to delay seconds late (by event time) are still processed.
At startup, the job reads all rules and takes the maximum watermark_delay across any velocity rule with time_mode: "event_time" and any correlation rule. That maximum is used as the bounded-out-of-orderness delay. For example: if one velocity rule has watermark_delay: 3 and a correlation rule haswatermark_delay: 5, the job uses 5 seconds. The strategy is fixed at job startup — changing watermark_delay in rules at runtime does not take effect until the job is restarted.
In the velocity processor: Forevent_time, we use ctx.timer_service().register_event_time_timer(timestamp) to schedule window cleanup. When the watermark passes that timestamp, Flink calls, on_timer and we evict entries older than the window. processing_time does not use watermarks — we evict on each event using wall-clock time.
4. How We Built It: Flink's Keyed Stream And State
The engineering team used Flink's key_by so that all events for the same (rule_id, group_by_value), or just rule_id when there is no group_by, go to the same task. We then use a KeyedProcessFunction with ListState (for the sliding window of (timestamp, value, event_json)) and ValueState (for last emitted aggregation and, in event_time mode, cleanup timers).
Routing Events To Velocity Keys
A BroadcastProcessFunction (VelocityBroadcastFunction) reads rules from broadcast state and, for each event, emits (key, rule_json, event_json) for every matching velocity rule. The key is rule_id or rule_id|group_by_value, so that key_by(lambda x: x[0]) sends all events for that key to the same task.
Window And Aggregation In A KeyedProcessFunction
VelocityKeyedProcessor uses get_list_state / get_state to hold the window and "last emitted" value. On each event it: adds to the list, evicts entries older than the window, computes the aggregation (count / sum / avg / etc.), and emits when the result ≥ threshold. ValueState prevents duplicate emissions while the window stays above threshold, and in event_time mode the timer service runs cleanup when the watermark advances.
Synthetic Routing Markers
A thin VelocityKeyedFunction receives (key, rule_json, event_json), gets or creates a VelocityKeyedProcessor per rule (with hot-reload on version change), and delegates to it. When emit_to_sink: false, the function emits a tuple with the synthetic topic _VELOCITY_TO_CORRELATION_ROUTE. Downstream, this single stream is split into "sink-bound" and "correlation-bound" branches using standard Flink .filter(), no side outputs required.
5. Code Snippets
Routing Events To Velocity Keys (VelocityBroadcastFunction)
For each event, we iterate velocity rules in broadcast state, compute the key, and yield (key, rule_json, event_json):
Keying And Processing
State Declaration In VelocityKeyedProcessor.open()
One ListState for the window and ValueState for last emitted aggregation:
process_element (conceptually): Add the current event to the list → evict entries older than (current_time - window_size) → compute aggregation → if result ≥ threshold (and optional conditions match), emit the event with the appropriate routing metadata (sink topic or _VELOCITY_TO_CORRELATION_ROUTE) and update last_emitted_agg.
End-to-End Flow For Velocity
- Events and rules enter the same way as in Part 1 — Kafka events stream, rules from Kafka, broadcast state.
- A
VelocityBroadcastFunction(separate from the threshold one) runs onevents_stream.connect(broadcast_rules). For each event it reads velocity rules from broadcast state and emits(key, rule_json, event_json)for each matching velocity rule, with optional pre-filtering byconditions. - The stream is keyed by the first element:
velocity_keyed = velocity_keyed_stream.key_by(lambda x: x[0]). All events for the same(rule_id, user_id)or(rule_id)go to the same task. VelocityKeyedFunctionruns on the keyed stream. It parses(key, rule_json, event_json), gets or creates aVelocityKeyedProcessorfor that rule, and callsprocessor.process_element(event_json, ctx, collector). The processor updates itsListState(window), evicts old entries, aggregates, and when the threshold is exceeded, yields a routing tuple.- If
emit_to_sink: true, yields(sink_topic, enriched_json). Iffalse, yields(_VELOCITY_TO_CORRELATION_ROUTE, event_json)— allowing downstream operators to route it specifically to the correlation pipeline. velocity_processedis unioned with threshold and correlation results; theemit_to_sink: falsebranch feeds into the correlation pipeline (e.g. AstroTurf).

Example: Matching Behavior
Rule: rapid_clicks, count ≥ 10 per user in 10 seconds, emit_mode: last_event.
Input: 12 click events from the same user_id, all within 10 seconds.
- As each event arrives, it is added to that user's window; entries beyond the 10-second boundary are evicted.
- After the 10th event,
aggregation (count) = 10 ≥ threshold. One record is emitted (the 10th/last event), andlast_emitted_aggis set to10. - The 11th and 12th events push count to
11and12, but no further emissions occur,last_emitted_aggprevents duplicates until the window drops below the threshold again.
Without group_by: All 12 events would route to the same key (rule_id only), producing one shared window, count = 12, one emit.
AWS MSF Compatibility Warning
While Apache Flink supports Side Outputs natively, the current PyFlink implementation on AWS Managed Service for Apache Flink (MSF) does not support collect_side() or OutputTag. Using them will cause operators to fail silently or starve downstream tasks.
Solution: Always favor Synthetic Routing. Emit all records to a single main output and use standard .filter() and .map() operations to achieve the same result as side outputs. This ensures your code remains portable across local Flink clusters and AWS MSF.
Putting It All Together
Velocity detection is where stream processing earns its complexity budget. Unlike threshold rules,which evaluate each event in isolation, velocity rules require the engine to hold state, reason across time, and make decisions based on what has accumulated, not just what has arrived.
The pattern we've built here handles that in three coordinated layers. The rule schema gives you expressive control over the window, aggregation, and grouping without writing new code for each use case. The KeyedProcessFunction with ListState keeps that window per key, evicts stale entries, and fires precisely when the threshold is crossed, no more, no less. And synthetic routing via _VELOCITY_TO_CORRELATION_ROUTE keeps the pipeline portable, avoiding the side output limitations that would otherwise break on AWS MSF.
Together, these three pieces let you go from "a user clicked ten times in ten seconds" to a routed, enriched, correlation-ready signal, entirely within the stream, with no external lookups and no batch lag.
In the next post, we take that signal further: the correlation pattern connects a primary event stream to a context stream, applies a lookback window, and emits matches when conditions align across both. If velocity is about how fast, correlation is about what happened together.
Building real-time detection pipelines at this layer of complexity requires more than the right framework; it requires the right architecture decisions from the start. If your team is navigating streaming infrastructure, AWS MSF constraints, or PyFlink at scale, talk to our engineering team, we've been there.
Qais Qadri, Senior Software Engineer
Qais enjoys exploring places, going on long drives, and hanging out with close ones. He likes to read books about life and self-improvement, as well as blogs about technology. He also likes to play around with code. Qais lives by the values of Gratitude, Patience, and Positivity.
Leave us a comment