Since version 1.5.0, Apache FlinkⓇ features a new type of state which is called Broadcast State. In this post, we explain what Broadcast State is, and show an example of how it can be applied to an application that evaluates dynamic patterns on an event stream. We walk you through the processing steps and the source code to implement this application in practice.
The Broadcast State can be used to combine and jointly process two streams of events in a specific way. The events of the first stream are broadcasted to all parallel instances of an operator, which maintains them as state. The events of the other stream are not broadcasted but sent to individual instances of the same operator and processed together with the events of the broadcasted stream.
The new broadcast state is a natural fit for applications that need to join a low-throughput and a high-throughput stream or need to dynamically update their processing logic. We will use a concrete example of the latter use case to explain the broadcast state and show its API in more detail in the remainder of this post.
Imagine an e-commerce website that captures the interactions of all users as a stream of user actions. The company that operates the website is interested in analyzing the interactions to increase revenue, improve the user experience, and detect and prevent malicious behavior.
The website implements a streaming application that detects a pattern on the stream of user events. However, the company wants to avoid modifying and redeploying the application every time the pattern changes. Instead, the application ingests a second stream of patterns and updates its active pattern when it receives a new pattern from the pattern stream. In the following, we discuss this application step-by-step and show how it leverages the broadcast state feature in Apache Flink.
The second stream provides action patterns that the application will evaluate. A pattern consists of two consecutive actions. In the figure above, the pattern stream contains the following two:
Pattern #1: A user logs in and immediately logs out without browsing additional pages on the e-commerce website.
Such patterns help a business in better analyzing user behavior, detecting malicious actions, and improving the website experience. For example, in the case of items being added to a shopping cart with no follow up purchase, the website team can take appropriate actions to understand better the reasons why users don’t complete a purchase and initiate specific programs to improve the website conversion (such as providing discount codes, limited free shipping offers etc.)
On the right-hand side, the figure shows three parallel tasks of an operator that ingest the pattern and user action streams, evaluate the patterns on the action stream, and emit pattern matches downstream. For the sake of simplicity, the operator in our example only evaluates a single pattern with exactly two subsequent actions. The currently active pattern is replaced when a new pattern is received from the pattern stream. In principle, the operator could also be implemented to evaluate more complex patterns or multiple patterns concurrently which could be individually added or removed.
We will describe how the pattern matching application processes the user action and pattern streams.
When a task receives a new user action, it evaluates the currently active pattern by looking at the user’s latest and previous actions. For each user, the operator stores the previous action in the keyed state. Since the tasks in the figure above only received a single action for each user so far (we just started the application), the pattern does not need to be evaluated. Finally, the previous action in the user’s keyed state is updated to the latest action, to be able to look it up when the next action of the same user arrives.
Until now, we conceptually discussed the application and explained how it uses broadcast state to evaluate dynamic patterns over event streams. Next, we’ll show how to implement the example application with Flink’s DataStream API and the broadcast state feature.
Let’s start with the input data of the application. We have two data streams, actions, and patterns. At this point, we don’t really care where the streams come from. The streams could be ingested from Apache Kafka or Kinesis or any other system. Action and Pattern are Pojos with two fields each:
DataStream<Action> actions = ???
DataStream<Pattern> patterns = ???
Action and Pattern are Pojos with two fields each:
Action: Long userId, String action
Pattern: String firstAction, String secondAction
As a first step, we key the action stream on the userId attribute.
KeyedStream<Action, Long> actionsByUser = actions
.keyBy((KeySelector<Action, Long>) action -> action.userId);
Next, we prepare the broadcast state. Broadcast state is always represented as MapState, the most versatile state primitive that Flink provides.
MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>(
"patterns", Types.VOID, Types.POJO(Pattern.class));
Since our application only evaluates and stores a single Pattern at a time, we configure the broadcast state as a MapState with key type Void and value type Pattern. The Pattern is always stored in the MapState with null as key.
BroadcastStream bcedPatterns =
patterns.broadcast(bcStateDescriptor);
Using the MapStateDescriptor for the broadcast state, we apply the broadcast() transformation on the patterns stream and receive a BroadcastStream bcedPatterns.
DataStream<Tuple2<Long, Pattern>> matches = actionsByUser
.connect(bcedPatterns)
.process(new PatternEvaluator());
After we obtained the keyed actionsByUser stream and the broadcasted bcedPatterns stream, we connect() both streams and apply a PatternEvaluator on the connected streams. PatternEvaluator is a custom function that implements the KeyedBroadcastProcessFunction interface. It applies the pattern matching logic that we discussed before and emits Tuple2<Long, Pattern> records which contain the user id and the matched pattern.
public static class PatternEvaluator
extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {
// handle for keyed state (per user)
ValueState<String> prevActionState;
@Override
public void open(Configuration conf) {
// initialize keyed state
prevActionState = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastAction", Types.STRING));</code
}
/**
* Called for each user action.
* Evaluates the current pattern against the previous and
* current action of the user.
*/
@Override
public void processElement(
Action action,
ReadOnlyContext ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// get current pattern from broadcast state
Pattern pattern = ctx
.getBroadcastState(
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)))
// access MapState with null as VOID default value
.get(null);
// get previous action of current user from keyed state
String prevAction = prevActionState.value();
if (pattern != null && prevAction != null) {
// user had an action before, check if pattern matches
if (pattern.firstAction.equals(prevAction) &&
pattern.secondAction.equals(action.action)) {
// MATCH
out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
}
}
// update keyed state and remember action for next pattern evaluation
prevActionState.update(action.action);
}
/**
* Called for each new pattern.
* Overwrites the current pattern with the new pattern.
*/
@Override
public void processBroadcastElement(
Pattern pattern,
Context ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState<Void, Pattern> bcState =
ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
// storing in MapState with null as VOID default value
bcState.put(null, pattern);
}
}
>
The KeyedBroadcastProcessFunction interface provides three methods to process records and emit results.
processBroadcastElement() is called for each record of the broadcasted stream. In our PatternEvaluator function, we simply put the received Pattern record in to the broadcast state using the null key (remember, we only store a single pattern in the MapState).
processElement() is called for each record of the keyed stream. It provides read-only access to the broadcast state to prevent modification that result in different broadcast states across the parallel instances of the function. The processElement() method of the PatternEvaluator retrieves the current pattern from the broadcast state and the previous action of the user from the keyed state. If both are present, it checks whether the previous and current action match with the pattern and emits a pattern match record if that is the case. Finally, it updates the keyed state to the current user action.
onTimer() is called when a previously registered timer fires. Timers can be registered in the processElement method and are used to perform computations or to clean up state in the future. We did not implement this method in our example to keep the code concise. However, it could be used to remove the last action of a user when the user was not active for a certain period of time to avoid growing state due to inactive users.
You might have noticed the context objects of the KeyedBroadcastProcessFunction’s processing method. The context objects give access to additional functionality such as
The broadcast state (read-write or read-only, depending on the method),
A TimerService, which gives access to the record’s timestamp, the current watermark, and which can register timers,
The current key (only available in processElement()), and
A method to apply a function the keyed state of each registered key (only available in processBroadcastElement())
The KeyedBroadcastProcessFunction has full access to Flink state and time features just like any other ProcessFunction and hence can be used to implement sophisticated application logic. Broadcast state was designed to be a versatile feature that adapts to different scenarios and use cases. Although we only discussed a fairly simple and restricted application, you can use broadcast state in many ways to implement the requirements of your application.
In this blog post, we walked you through an example application to explain what Apache Flink’s broadcast state is and how it can be used to evaluate dynamic patterns on event streams. We’ve also discussed the API and showed the source code of our example application.
We invite you to check the documentation on the Apache Flink website and provide feedback or suggestions for further improvements through the Apache Flink mailing list.