Stream Processing & Apache Flink - News and Best Practices

Apache Flink® Master Branch Monthly: New in Flink in January 2018

Written by Timo Walther | 09 February 2018

Flink’s major version releases occur every few months, and there’s a constant stream of activity as new features are merged to the Flink master branch in between releases. Keeping an eye on what’s going into Flink’s master is one of the best ways to stay up-to-date on new work that hasn’t yet made it into an official release.

So we’re going to try something new this year: a “Flink Master Monthly” blog post where we highlight a selection of features that have been merged into Flink’s master branch during the past month. If you’re interested in trying out these features out once they’re in master, you can certainly do so--keeping in mind that they haven’t yet been fully tested until they go through the official release process.

If you’d like to see a full list of newly-merged features from a given time period, Git is your friend. You can simply run the following:

git shortlog -e --since="01 Jan 2018" --before="01 Feb 2018"

The raw list offers quite a lot to sift through, so here’s our brief summary.

Improvements to Flink's deployment and process model (FLIP-6): FLIP-6, a major rework of Flink’s deployment and process model in order to improve integration with YARN, Mesos, and container managers (e.g. Docker & Kubernetes), is nearing completion. This FLIP will clear the way for oft-requested features such as dynamic scaling, among other things. The following issues were merged in January during the home stretch for FLIP-6.

  • [FLINK-7903] [tests] Add flip6 build profile

  • [FLINK-7904] Enable Flip6 build profile on Travis

  • [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to Dispatcher

  • [FLINK-8299] [flip6] Retrieve JobExecutionResult after job submission

Groundwork for task recovery from local state, which speeds up failure recovery: There’s work underway to introduce task recovery from local state. This means that Flink will store a second copy of the most recent checkpoint on the local disk of a task manager. In case of failover, the scheduler will try to reschedule tasks to their previous task manager (in other words, on the same machine) if possible. The task can then recover from the locally-stored state, making it possible to avoid reading all state from the distributed file system, which is remote over the network. FLINK-7220 is is core to this feature.

  • [FLINK-7720] [checkpoints] Centralize creation of backends and state related resources

Improved state backend abstraction: FLINK-5820 addresses a limitation with Flink’s state backend abstraction: that each piece of state is only meaningful in the context of its state handle. There is no possibility of a view into "all state associated with checkpoint X". This can cause a few different issues, including state not being cleaned up in the process of failures and state cleanup often being more expensive than necessary. Two of the last missing pieces for this improvement were merged in January.

  • [FLINK-5823] Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

  • [FLINK-8531] Support separation of "Exclusive", "Shared" and "Task owned" state

Network stack changes to improve performance: Planned for Flink 1.5 is a rework of Flink’s network stack so that Flink can achieve lower latency, and 3 outstanding issues related to these improvements went into master last month.

  • [FLINK-7520][network] let our Buffer class extend from netty's buffer class

  • [FLINK-7427][network] integrate PartitionRequestProtocol into NettyProtocol

  • [FLINK-8375][network] Remove unnecessary synchronization

Application-level flow control for improved control of checkpointing behavior: Work is being done to enable checkpoints to complete faster, resulting in fewer checkpoint timeouts.

  • [FLINK-7406][network] Implement Netty receiver incoming pipeline for credit-based

  • [FLINK-7416][network] Implement Netty receiver outgoing pipeline for credit-based

  • [FLINK-7468][network] Implement sender backlog logic for credit-based

Improved Mesos integration with Docker: This work actually originated from a StackOverflow question posed by a user and enables passing of custom parameters to docker run when launching task managers.  

  • [FLINK-8490] Allow custom Docker parameters for Docker tasks on Mesos

Table API / Streaming SQL: There are a handful of different features to highlight in the Table API / Streaming SQL. 

While windowed inner joins were added in the last release, Flink now also supports all types of windowed outer equi-joins [FLINK-7797]. Queries like the one shown below allow for joining in a bounded time range in both event-time and processing-time.

SELECT Table1.uid, Table2.event
FROM Table1
LEFT OUTER JOIN Table2
ON Table1.uid = Table2.uid AND Table1.rowtime BETWEEN Table2.rowtime - INTERVAL '10' SECOND AND Table2.rowtime + INTERVAL '1' HOUR

For cases where two streams should not be joined within a bounded interval, we also merged non-windowed inner joins [FLINK-6094]. They allow for performing full-history matching, as it is common in many standard SQL statements.

SELECT Table1.uid, Table2.name
FROM Table1
JOIN Table2
ON Table1.uid = Table2.uid

In the past, it was sometimes cumbersome to convert a DataStream or DataSet into a Table. We improved this by allowing a schema definition both based on name or on index [FLINK-8203].

Flink also now supports more SQL auxiliary functions such as MD5, SHA1, SHA256, BIN, LPAD, and RPAD [FLINK-6810].

For upcoming features such as a SQL Client that enables executing Flink jobs in a non-programmatic way, we made efforts to define table sources in a consistent, string property-based way [FLINK-8240]. This efforts are not done yet but will make it easier to retrieve tables from external systems and will make the table source discovery more modular.

Ecosystem integrations:
OpenStack provides software for creating public and private clouds on pools of resources, and this feature was motivated by a user who runs Flink on OpenStack and wanted to be able to use OpenStack’s S3-like filesystem, Swift, for checkpoint and savepoint storage and to do so without Hadoop dependencies.

Integrate generated config tables into documentation: Previously, documentation of Flink configuration settings was manually maintained, which made it easy for the documentation to go out-of-sync with the actual available configurations. With the change, we automatically generate the configuration documentation from the parts of the code that define the configuration settings, making sure that the configuration documentation is always up to date.

  • [FLINK-6590][docs] Integrate generated tables into documentation

Thanks for reading, and we'll be back soon!