Monitoring Large-Scale Apache Flink Applications, Part 2
The previous article in this series focused on continuous application monitoring and presented the most useful metrics from our point of view. However, Flink’s metrics system offers a lot more, and we would like to highlight a couple of useful metrics that help you specifically while troubleshooting applications. All of the metrics presented in the previous article are useful entry points for troubleshooting and point you in the right direction. The metrics we would like to focus on here extend on these and provide more insights so that you can identify resource bottlenecks and sources of errors.
At the end of this blog post, we will also share some details about the Grafana dashboard we are using and will share it with you so you can get started with your own application monitoring.
JVM metrics (2): Troubleshooting
Let’s have a second look at JVM-level metrics. Before we continue, though, we first have to distinguish between the two types of processes a Flink application is running on: the JobManager (JM) and the TaskManager (TM).
JobManager
There is usually not much that can go wrong with the JobManager, except for underprovisioning its resources in one of the following situations. If the number of TaskManagers to maintain is high, the JM needs more memory to maintain internal data structures but, most importantly, needs more CPU to process the various keep-alive messages and checkpoint messages and data which are orchestrated by the checkpoint coordinator sitting at the JobManager. Similarly, if you have many jobs to maintain (in a session cluster) or have high checkpoint frequencies, you may also want to increase the available (peak) CPU time. Speaking of checkpointing, depending on your job’s size and the configuration of state.storage.fs.memory-threshold, your JM may need more resources to build and write the inlined checkpoint data into the _metadata file. If you deploy in application mode, the JM will also execute user code and may need additional CPU, memory,… for that, depending on your business logic.
For all of these, you can inspect metrics such as system-level metrics, e.g., from Kubernetes, or Flink metrics like Status.JVM.CPU.*, Status.JVM.Memory.*, or Status.JVM.GarbageCollector.* in a dashboard like the following. They will help you identify situations like the ones mentioned above and then tune accordingly.
Flink troubleshooting: JVM metrics on a JobManager
TaskManager
On the TaskManager, the same system-level metrics can be used to identify problems with the actual data processing, e.g., load imbalances, memory leaks, problematic TMs, etc. While you may be tempted to set up alerts on the TMs’ CPU use, this may be too narrow. Monitoring your application's throughput is a much better indicator of bottlenecks since it includes all resources, e.g., disk, network, etc. While troubleshooting, however, you may want to identify the concrete resource bottleneck where Status.JVM.CPU.Load and others are useful again. Also recall that load measurements like this can be misleading since a value of 0.021, for example, may already mean a 100% load for a TM container with 1 CPU on a 48-core machine.
Flink troubleshooting: CPU and memory overview on all TaskManagers
Depending on your state backend, you may need to focus on different metrics.
For Heap-based state backends, for example, the most important part is to monitor each TM’s Status.JVM.Memory.Heap.Used which is an indicator of the state size on that TM. It should not exceed the limits you set and you should scale before you hit them (setting up an alert for this may be helpful!). Growing heap/state sizes could either come from legitimate increases of the work your job is doing, e.g., more entities to process which lead to an increasing number of keys, or from Flink buffering more data for you due to an increasing event-time skew between different streams or a failure to clean up state (in your code!). Additional metrics on each of the tasks involved, e.g., numRecordsIn/Out will help you estimate the load characteristics of your job.
Since heap memory is involved, garbage collection (GC) may obviously also be problematic. There are a couple of JVM-level statistics to help you track these, e.g., Status.JVM.GarbageCollector.<GarbageCollector>.[Count|Time] and more details are also available in the JVM’s GC logs which need to be enabled separately. Since this is not too Flink-specific, I will defer to common literature in this regard.
Flink troubleshooting: TaskManager memory and GC details
Even though Flink’s RocksDB state backend is operating off-heap, you should still keep an eye out on memory and GC. This is due to the unfortunate fact that even with the RocksDB upgrade of Flink 1.14 (to RocksDB 6.20.3), while doing its best, Flink is not able to fully control how RocksDB is using its memory. There may be situations where RocksDB wants to use more memory than allocated and may fail. In order to get notified early, i.e., before TMs get killed after hitting their assigned memory limit, you should set up alerts on the memory used vs. available and best use system-level metrics such as container_memory_working_set_bytes and container_spec_memory_limit_bytes from Kubernetes which include all types of memory your job acquires, including those that the JVM cannot track. If you see your job approaching the limit, for RocksDB, you can adjust the framework/task off-heap memory or the JVM overhead portion of the TaskManager memory layout.
Even without a heap-based state backend, you should keep an eye on garbage collection since high garbage collection pressure will lead to other problems! With RocksDB, this should only originate from Flink itself or your user code.
RocksDB
RocksDB collects a vast number of low-level metrics that you can look at and make sense of after reading through its internals and studying its tuning guide. We'll try to give you the TL;DR version for a couple of key metrics. You can find the full set of RocksDB native metrics in the Flink docs which explain how to enable them and indicate metrics that may have a negative effect on performance. Once enabled, metrics are exposed under the <operator>.<state-name>.rocksdb scope which we omit in the metric identifiers below.
- RocksDB is a log-structured merge tree that uses immutable files on disk. Deleted data will be marked as deleted in subsequent files; similarly, updated data will be written to subsequent files to shadow any previous versions. estimate-live-data-size helps you identify the actual state size without stale data. If this size is much smaller than the occupied size on disk (space amplification), you may want to think about letting RocksDB compact more often to clean up stale data. There is also the total-sst-files-size metric, but that may slow down queries if there are too many files.Alternatively, you can also use the previously mentioned checkpoint size metrics as an estimate but they will also include operator state (usually small) and anything else from memory, e.g., on-heap timers or user-managed state.
-
The background-errors metric indicates low-level failures inside RocksDB. If you see those, you may want to inspect RocksDB’s log file.
- RocksDB write operations are first added to an in-memory table which, when full, will be queued for flushing (and re-organizing) to disk. This queue has a limited size, and hence if flushing does not complete fast enough, it will back-pressure inside RocksDB. The actual-delayed-write-rate metric shows these write stalls which may be caused by slow disks.In these cases, it is often helpful to tune the number of threads for background jobs (flush and compaction) per stateful operator by changing the config value of state.backend.rocksdb.thread.num, which will add more concurrency here.
Flink troubleshooting: RocksDB data size, errors, and write stalls
- Compaction is the process of removing stale data from disk by merging files together and keeping only the most-recent version of each data item. This is essential for read performance and can be inspected by looking at metrics such as estimate-pending-compaction-bytes, num-running-compactions, compaction-pending which indicate bottlenecks in the compaction processes. These could come from slow disks or low concurrency not saturating the available disks.Similarly to stalled write operations, tuning state.backend.rocksdb.thread.num may help to increase concurrency.
Flink troubleshooting: RocksDB compaction
- Similar to the delayed write rate from above, you can further troubleshoot the process of flushing in-memory tables to disk by looking at is-write-stopped, num-running-flushes, and mem-table-flush-pending. These metrics indicate write-heavy jobs and, together with system-level I/O statistics, allow you to fine-tune disk performance, e.g., by increasing concurrency via state.backend.rocksdb.thread.num as above.
Flink troubleshooting: RocksDB flushes
All the metrics above help you interpret what is going on inside RocksDB and help you identify knobs around the RocksDB tuning triangle of Space, Write, and Read Amplification. Actually, identifying the latter one is a bit of a challenge but if you see that your job is not write-heavy (or at least not stalled on write operations) and still performing slowly while spending most of its time inside RocksDB read operations, e.g., through state access latency tracking or with the help of a profiler or Flink’s own flame graphs, that’s a good indicator of read amplification (and/or a disk bottleneck). Increasing compaction efforts may help but this is usually a case for enabling bloom filters to reduce the amount of data to scan through.
Getting started with a custom Grafana dashboard
We have baked our recommendations from the previous blog post and the metrics above into a Grafana dashboard for a Prometheus data source. Getting Flink metrics into Prometheus is relatively simple via the Prometheus metric reporter (either the pull or push version). At the simplest, you just provide the following Flink configuration and let Prometheus scrape the metrics from your Job and TaskManagers.
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
You can download our dashboard from ververica-platform-playground/grafana-dashboard.json and use it as a starting point for your own monitoring dashboards. In the following sections, we will explain a few more details of this dashboard and present a simple playground to test-drive it.
At the top of the dashboard, variables allow you to select the deployment to look at, a specific task to show details for (in various graphs of the following rows), RocksDB states to inspect (for low-level RocksDB statistics), and the TaskManager(s) to show detailed JVM statistics for. All provide a generic overview, but as soon as your application is scaling to a high number of resources, the graphs get too crowded, at least for troubleshooting purposes. You can, of course, also use Grafana’s abilities to filter out individual graphs.
If you introspect the panels, you will find a few details that are being taken care of, e.g., that the checkpoint statistics are guarded around increase functions because they will be reset after a job restart or quantiles that are being reported by subtask will just use max to show a single value across all of them. Usually, there is a panel with a generic overview of the whole job or all subtasks and, alternatively a second panel with more details per subtask for the selected Task at the top of the dashboard. This serves two purposes: (1) simplify grasping information quickly, doing continuous monitoring, and getting a high-level overview and (2) for looking at details to continue troubleshooting.
Please note that the Latency row’s first panels are visualizing a custom eventTimeLag metric that you may or may not have in your jobs. For our use cases, this has proven quite useful (for more information, refer to the first part of this series). We also provide two variants of a more generic event time lag visualization using the built-in currentOutputWatermark metric, once in tabular form and once by plotting the difference between the latest watermark and the time this value was reported. As mentioned earlier, this is less accurate because Prometheus' granularity is at a second-level here and there may also be a delay from capturing the value inside Flink and capturing this timestamp. However, as a best-effort graph that works with every Flink application, it is usually good enough except for low-latency use cases. For these, we encourage you to provide your own eventTimeLag metric as described.
In case you are wondering: throughput metrics are not showing any values for sinks because not all sinks support reporting output records or bytes yet. Also, the RocksDB row has a couple of more metrics than the ones introduced above. We decided to put all of the metrics available to Flink 1.14 into the graph in case you (enabled and) need them. They may be useful in specific situations but are less important in the regular case where it is enough to look at the ones we introduced.
A final word on some of the low-level Kubernetes metrics that we plot: metrics like container_cpu_usage_seconds_total or container_memory_working_set_bytes are reported per container. In the case where only pods are being annotated with the job/deployment IDs like here, we will need to join this metric together with one coming from a pod. We did so with the JVM status metrics and hence come up with Prometheus queries like the following one which purely uses the pod for connecting it to the deployment or TM but do not otherwise affect the result of the metric:
sum(
0*max({
__name__="flink_taskmanager_Status_JVM_CPU_Load",
deploymentId="$deploymentId"}) by (pod,tm_id)
+
on(pod)
group_left()
(
rate(
container_cpu_usage_seconds_total{
pod=~"job-.*-taskmanager-.*",
container=~"flink-taskmanager"}[5m]) * 1024 * 100
/
on(pod,id)
container_spec_cpu_shares{
pod=~"job-.*-taskmanager-.*",
container=~"flink-taskmanager"}
)
) by (deploymentId,tm_id)
Feel free to use this dashboard and extend it to your needs, e.g., by adding your own application-specific metrics that help you identify how your application is behaving.
Test-driving the dashboard
If you want to try out the dashboard yourself, you can set up our Ververica Platform playground as described in our Getting Started guide. You can run this in any Kubernetes environment, also locally in Minikube as shown. Once up, you can upload your application’s jar file and set up a new deployment quickly. As soon as this is running, the “Metrics” link will become available and direct you to our Grafana dashboard showing all the metrics for your application.
Go ahead and try it out in various applications and different workloads.
Summary
In this second part of our two-piece series on large-scale Apache Flink application monitoring, we focused on metrics that primarily help you troubleshooting application failures and performance issues. If you would like to refresh your knowledge on continuous application monitoring, please have a look at the first part of this series. We also introduced our Grafana monitoring and troubleshooting dashboard and made it available to you so that you can start quickly in your own efforts with a production-ready Flink application. We’re curious to hear how the dashboard is working for you and are looking forward to any feedback or suggestions of what you would like to see added.
Resources
From Kappa Architecture to Streamhouse: Making the Lakehouse Real-Time
From Kappa to Lakehouse and now Streamhouse, explore how each help addres...
Fluss Is Now Open Source
Fluss, a real-time streaming storage system for data analytics, is now op...
Announcing Ververica Platform: Self-Managed 2.14
Discover the latest release of Ververica Platform Self-Managed v.2.14, in...
Real-Time Insights for Airlines with Complex Event Processing
Discover how Complex Event Processing (CEP) and Dynamic CEP help optimize...