Stream Processing & Apache Flink - News and Best Practices

How BetterCloud Built an Alerting System with Apache Flink®

Written by David Hardwick | 29 June 2017

Many Apache Flink® users are building applications for alerting or anomaly detection, and ING and Mux are two such examples from the most recent Flink Forward conference.

Today, we’ll highlight the work of BetterCloud, who learned that a dynamic alerting tool would only be truly useful to their customers only if newly-created alerts applied to future events as well as historical events.

In this guest post, we’ll talk more about how BetterCloud uses Apache Flink to power its alerting and how they delivered on the challenge of applying newly-created alerts to historical event data in an efficient manner.


The post is an adaptation of BetterCloud’s session from Flink Forward San Francisco 2017. You can find a recording and slides from their presentation here.

BetterCloud is a multi-SaaS management platform that simplifies the job of IT professionals operating today’s modern workplaces. This is a world that has become much more complex in recent years as companies of all sizes rely on a wide range of SaaS applications to run their businesses.

The platform serves as “mission control” for IT: a single console for managing many different SaaS applications used within a company, such as Slack, G Suite, Salesforce, and Dropbox. BetterCloud allows its users to create alerts and execute automated policies based on user activity in different SaaS products. For example, if a user starts forwarding their work email to their personal Gmail account--presenting a potential security issue--IT or security teams can create alerts in BetterCloud, automatically prevent further forwarding, and send a notification to that individual reminding them of company policy. BetterCloud can also automate complex processes across applications, such as onboarding or offboarding employee accounts (for G Suite alone, the average offboarding process consists of 28 unique steps).

BetterCloud provides a library of “global” alerts that are available to all of its customers, and it also offers the ability for customers to configure custom alerts. When a customer creates a BetterCloud alert, it can apply to future events as well as historical events. Enabling this custom alerting functionality, which can be applied to both future and historical events, presented a non-trivial technical problem for the BetterCloud engineering team, and in this post, we’ll detail how we built our alerting system with Apache Flink to handle both future and historical events while also:

  • Dealing with massive scale: BetterCloud consumes hundreds of millions of events each day.

  • Getting alerts in front of customers quickly: An alert is most valuable to a customer immediately after the triggering event occurs, meaning low latency is critical.

  • Deploying on a regular basis: Using continuous delivery, we need to validate new features with customers as soon as they can.

Foundational Learnings: An Event Stream Model and Rules Engine
Adding Rules Via Control Streams
Simple Flink Job Walkthrough
Handling Historical Data
The Not So Simple Job Walkthrough
Wrapping Up

Foundational Learnings: An Event Stream Model and a Rules Engine

When starting on the alerting project, our team learned quickly that an event stream processing model was best for our use case, and specifically, that it was a much better fit for our alerting application than an ingest & query model.

Ingest & query is relatively slow and inefficient with lots of redundant processing. We already mentioned that low latency is important for an alerting system like ours, and we weren’t going to get that from an ingest & query model. Lastly, ingest & query focuses more on state than on state transitions, and state transitions are what matter most to us.

The hundreds of millions of events per day processed by the BetterCloud Platform were already streaming to us via Apache Kafka (our data ingestion team did a great job of setting us up) and so the event stream processing model was simple for us to implement.

When choosing a stream processor, BetterCloud didn't just pick Flink out of thin air or flip a coin. We evaluated four different stream processing solutions against a set of evaluation criteria. Flink was our choice because of the vibrant community, the availability of commercial support, and the ability to achieve exactly once processing.

A second key design decision: early in the process, we decided that a rules engine would provide much more value than hard-coded rules. Hard-coded rules are difficult to maintain, inflexible, and not much fun for an engineering team to work on. Plus, our customers needed to be able to customize alerts based on the events they care about.

Our team was already standardized on Avro or JSON as our serialization format, and so we chose Jayway’s JsonPath for the rules engine. JsonPath executes queries against discrete JSON documents, and it also provides a really simple way to wrap a non-technical user interface around a query. It’s also possible for our users to test that a new query works as expected before actually running it in production.

JsonPath parses a document into a tree and uses a familiar dot notation to walk through the tree. It also offers indexing and a subset of JavaScript operators (e.g. the length of an array). The queries it supports are fairly complex. The result is that our end users can create their own alerts inside the BetterCloud Platform, triggering based on certain combinations of custom events.

Adding Rules Via Control Stream

Once a user has added a new alerting rule, it’s submitted as a control event. Our team uses control events pretty liberally so that our Flink jobs behave dynamically (essentially saying: “Hey, here’s a new thing that we want you to look at”), and as new events come through the live event stream, we’re able to evaluate them against the newly-added rules.

In most of our Flink jobs, the first function is a variation of CoFlatMap. When rolled all together, we end up with 2 Kafka sources: one for control events and one for live data. Both flow into a filtering function that maintains the alert configurations which have been sent by the control events. We check to see if the customer has configured any alerts for the event type they’ve created, and then we combine the event with all potentially-matched alerts.

Next, events are forwarded to a qualifier function, which is responsible for doing the JsonPath evaluation for alert configurations. And if an event matches an alert configuration, it’s forwarded to a counter function which then sends the event to an output stream.

Now, let’s walk through a simplified version of one of our Flink jobs. If you’d like to take a closer look at the code we refer to in this post, it’s available on Github.

First, here’s what each of our event types look like. Customer events (our live event stream) have a customer ID and a payload that’s a JSON string. A control event has a customer ID, an alert ID, and a number of other fields including a bootstrap customer ID, which we’ll touch on later. This plays a role in applying newly-created rules to historical data.


case class CustomerEvent(customerId: UUID, payload: String)
case class ControlEvent(customerId: UUID, alertId: UUID, alertName: String, alertDescription: String, threshold: Int, jsonPath: String, bootstrapCustomerId: UUID)

As mentioned earlier, we have an event stream source that is keyed by customer ID to ensure that all counts for a single customer are maintained on a single Flink task manager. You’ll see that we filter out events that don’t match the schema.


val eventStream = env.addSource(new FlinkKafkaConsumer09("events", new CustomerEventSchema(), properties))
 .filter(x => x.isDefined)
 .map(x => x.get)
 .keyBy((ce: CustomerEvent) => { ce.customerId } )

And also a control stream source:


val controlStream = env.addSource(new FlinkKafkaConsumer09("controls", new ControlEventSchema(), properties))
                  	.filter(x => x.isDefined)
                  	.map(x => x.get)
                  	.name("Control Source")
                  	.split((ce: ControlEvent) => {
                    	ce.customerId match {
                      	case Constants.GLOBAL_CUSTOMER_ID => List("global")
                      	case _ => List("specific")
                    	}
                  	})

Some of the rules in BetterCloud are global rules, meaning they’re available to all of our customers. Those, we broadcast to all task managers like so:


val globalControlStream = controlStream.select("global").broadcast

And other rules are customer-specific and therefore are keyed using the same customer ID that we expect on the live event stream.


val specificControlStream = controlStream.select("specific")
 .keyBy((ce: ControlEvent) => { ce.customerId })

We then connect the event stream to a union of the global and specific control streams:


val filterStream = globalControlStream.union(specificControlStream)
 .connect(
   eventStream
 )

Next, events go into a CoFlatMap . FlatMap 1 adds the control events to a list in local state. If there’s an update to an existing rule, we’re able to change it here, too. FlatMap 2 takes in live customer events and checks to see if there are any rule configurations matching that customer ID. If there’s a match, the event along with all the matching control events are output as a single filtered event.


class FilterFunction() extends RichCoFlatMapFunction[ControlEvent, CustomerEvent, FilteredEvent] {
 var configs = new mutable.ListBuffer[ControlEvent]()
 override def flatMap1(value: ControlEvent, out: Collector[FilteredEvent]): Unit = {
   configs = configs.filter(x => (x.customerId != value.customerId) && (x.alertId != value.alertId))
   configs.append(value)
 }
 override def flatMap2(value: CustomerEvent, out: Collector[FilteredEvent]): Unit = {
   val eventConfigs = configs.filter(x => (x.customerId == x.customerId) || (x.customerId == Constants.GLOBAL_CUSTOMER_ID))
   if (eventConfigs.size > 0) {
 	out.collect(FilteredEvent(value, eventConfigs.toList))
   }
 }
}

Next is a qualifier function, which does three things:

  • Loops through each control event in the filtered event

  • Evaluates the JsonPath on the control event against the live event

  • Emits 0 to n qualified events containing the live event and the matching control event to a counter function

The counter function increments a count contained in a map that is keyed by customer ID plus the alert ID contained in the control event. If the key doesn’t yet exist, it sets the key to a count of 1.

Handling Historical Data

If we only needed to trigger alerts on future event data, then the job we’ve outlined so far would be sufficient. But as we mentioned earlier, our customers must be notified if newly-created rules trigger alerts on historical events (events that have already gone through the system), too.

Internally, we refer to our solution to the historical events problem as “bootstrapping”.

In the case described above where a key does not yet exist, we add an extra step. The Counter function outputs a control event to a new Kafka topic. A new “bootstrap” function is added in the chain with a Kafka source. It listens to the new topic.

When a bootstrap request control event is received in the bootstrap function, the function retrieves the historical events from a file and outputs them to a stream. The stream is unified with the live stream and goes into the qualifier function just like before. Now, the count reflects both live count and the historical count.

The simplified Flink job we just walked through is actually four Flink jobs and a couple databases smashed together, so let’s talk a bit about what else goes into our production deployment.

Obviously, we can’t store all of our historical production data in a text file, so we use Apache Hadoop and Apache Hive to manage long-term storage of data.

There is a separate Flink job called “ingest” that persists batches of data to Hive and maintains a list of the last timestamp seen for each tenant. Another Flink job called “query” waits for requests, and when one comes in, it sends a request to the Ingest job to make sure that the timestamp of the “query” request has persisted to Hive. It then executes the query against the Hive server and sends it back to the query requester.

Those two Flink jobs handle the storage and querying of all of our historical data, but we also have two other jobs that handle the actual processing of the data.

The jobs are almost identical to each other. However, one handles only historical data while the other handles live data. We do this because the processing of historical events can be fairly expensive. It’s a time-consuming operation. We don’t want it to slow down or affect the performance of our live alerts.

When the live-data job processes an event for an alert that it does not yet have a count for, it sends a request to the historical-data job to process the historical data for that tenant against the alert. When that’s done, the historical-data job sends the results of that request to the live-data job.

Because the order of events matters for our alerts, we will actually “block” live events until the historical events have been processed, but this block is artificial and does not actually block the processing of events. When a live event is bootstrapped and comes in for an alert, we store that event in an ever-expanding buffer. This obviously has a memory risk, but we use a buffering system with MySQL as the backend to prevent running out of memory. Once the historical-data job has finished processing the alert, the live-data job processes the buffered events, making it ready to handle live events from that point forward.

Wrapping Up

Thanks for following along, and we hope you found this overview of our use case to be helpful. Again, if you’d like to learn more, you can see our Flink Forward presentation here and get your hands on the sample code from the post here.

Looking ahead, the BetterCloud team is excited about taking full advantage of Flink’s RocksDB integration so that we can improve our state management strategies. We’re also excited about the performance improvements we’ll gain from incremental checkpointing, a new feature in Flink 1.3.0.

To learn more about BetterCloud, visit bettercloud.com. If you’d like to take on a new challenge, we’re hiring. Just visit bettercloud.com/careers to see if there’s a fit for you!