This blog post is the third part of our series on how Apache Flink™ enables new stream processing applications. The first post introduced event time and out of order processing, the second post covered versioning application state, and this post introduces improved support for time-based session windows.
When processing streams of user interaction events, a common first step is to aggregate the events into "sessions" - periods of activity, separated by gaps of inactivity. While users have already been doing sessionization of streams with Flink, the upcoming release (1.1.0) will extend the support for windowing elements into time-based sessions with two new features: (1) A simple API to define session windows. Prior to Flink 1.1.0, users would define session windows using custom window assigners and triggers. (2) Extensions to the windowing machinery that makes it possible to handle sessions when elements are heavily out of order. In this blog post, we will introduce the new API, as well as discuss the additions to Flink's windowing mechanism in detail. The code is available for you to try out in 1.1-SNAPSHOT preview releases right now.
To motivate the uses of session windows let’s start with an example. Assume we have an online video portal, such as Netflix or Youtube, and we want to analyze the behaviour of our users in some way to make their experience more satisfactory. The system will log all actions and activity of users along with the timestamp at which they occurred. The logged activities could be: “watched video", “gave rating”, “followed channel” and so on. Since we are doing real-time processing of streaming data we have to divide our infinite stream of events into windows if we want to do aggregations on them. A sample of events that we might get could look like this:
Elements can be divided into the sessions using the new SessionWindows window assigner. It can be used similarly to other pre-existing window assigners such as SlidingEventTimeWindows, TumlingProcessingTimeWindows, etc:
DataStream result = input
.keyBy(key selector)
.window(SessionWindows.withGap(Time.seconds(seconds)))
.apply(window function); // or reduce() or fold()
Now, elements will be put into session windows based on their timestamp. If the time difference between two elements (based on their timestamp) is smaller than the specified session gap the elements will be in the same session. Elements that have a larger difference in timestamps will be put into different session if there is no element in between them that would bridge the gap to put them into the same session:
To make the new SessionWindows assigner possible we had to extend the windowing mechanism in Flink to support merging of windows. To understand what this means we will first look at the status quo of windows and then motivate why merging is necessary. As mentioned here in Flink a WindowAssigner is responsible for assigning elements to windows based on their timestamp while a Trigger is responsible for determining when windows should be processed. For tumbling, i.e. non-overlapping time windows it looks like this:
Please note that the windows are all of the same size, the window size, and that their start and end points don’t depend on the elements but always fall on fixed time boundaries relative to some start timestamp (epoch, for most practical cases). The WindowAssigner looks at the timestamp of an element and then puts it in the appropriate window. The windows are fixed, they don’t interact and once an element is assigned to a window this assignment will never change. For session windows we needed a more dynamic approach. The basic idea looks like this: The SessionWindows assigner assigns each incoming element to a window that has the timestamp of the element as start point and (start point + session gap) as the end point. Let’s look at a running example to visualize this. Initially we get two elements, they get assigned to their two “private” windows with size of the session gap:
A third element arrives that will also be put into its own window. The situation now looks like this:
With the added support for merging windows a WindowAssigner is now given the chance to merge windows. It gets a list of all in-flight windows and can tell the system which of these should be merged together to form new windows. Flink will then take care of merging the underlying state of the windows (such as the elements in the windows or running aggregate in case of a reducing window) as well as the merging of Trigger states of the merged windows. The result will be that the three elements are now together in one window:
For our initial example the internal situation looks like this. First every element will be put into its own window:
But then they are merged together into one window. Before triggering a window computation the window will always check whether any windows should be merged, so you won’t get wrong results due to partial windows that should have been merged:
This should be enough for a rough overview of the changes required to make session windows a reality. The above is a description of the model underlying merging windows. There are, however, some optimizations to make it perform well in practice. For example, if new elements are always added such that their window would immediately overlap with a pre-existing window then no new window is created to be immediately merged into the other window. Instead the element is put directly into the pre-existing window while said window is extended to make it look as if it were merged with the (transient) window of the new element.