Stream Processing & Apache Flink - News and Best Practices

Flink SQL for powerful querying of data streams and data at rest

Written by Aljoscha Krettek | 06 November 2018

While Flink SQL was initially released with Flink 1.1.0 back in August 2016, recent Flink releases have added quite a bit of functionality that makes Flink SQL easier to use by eliminating the need to write Java/Scala code. In this post, we want to (re-)introduce Flink SQL from a new angle that is made possible by those changes while at the same time provide some extra learnings for well-seasoned users.

The newly added SQL command-line (SQL CLI) makes it easy to quickly explore data in streams or data at rest (for example, in a database or HDFS). It can also be used for building powerful data transformation pipelines or analysis pipelines. In this post, we want to explore what features are currently available while follow-up posts will cover specific features in more detail and introduce exciting new features that are coming up with Apache Flink 1.7, such as complex event processing using the MATCH_RECOGNIZE extension and an improved time-based enrichment join.
Before we dive into some hands-on examples, we list some of the highlights of Flink SQL:

  • Flink SQL is a unified API for batch and stream processing: this allows using the same queries for processing historic data and real-time data

  • Support for both processing time and event time semantics

  • Support for working with nested Avro and JSON data

  • User-defined scalar, aggregation, and table-valued functions

  • No-coding-required SQL command-line (that is, no Java/Scala coding)

  • Support for various types of stream joins (keep your eyes peeled for follow-up posts)

  • Support for aggregations, both with windows and without windows

Talking to the Outside World, a.k.a. Sources and Sinks

The first thing we have to do when working with the command line client of Flink SQL is to define our sources and sinks. Otherwise, we wouldn’t be able to read or write any data. Sources and sinks are defined in a YAML configuration file, along with other configuration settings. The source and sink configuration in the YAML file is analogue to SQL DDL statements (support for SQL DDL is currently under discussion in the Flink community). For our ongoing example, let’s assume we have a Kafka topic that stores information about taxi rides that we want to further process and analyze. The configuration for it looks like this:


tables:
- name: TaxiRides
type: source
update-mode: append
schema:
- name: rideId
type: LONG
- name: rowTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "rideTime"
watermarks:
type: "periodic-bounded"
delay: "60000"
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: taxiId
type: LONG
- name: driverId
type: LONG
- name: psgCnt
type: INT
connector:
property-version: 1
type: kafka
version: 0.11
topic: TaxiRides
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: zookeeper:2181
- key: bootstrap.servers
value: kafka:9092
- key: group.id
value: testGroup
format:
property-version: 1
type: json
schema: "ROW(rideId LONG, isStart BOOLEAN, rideTime TIMESTAMP, lon FLOAT, lat FLOAT, psgCnt INT, taxiId LONG, driverId LONG)"

In Flink SQL, sources, sinks, and everything in between is called a table. Here we define an initial table based on a Kafka topic that contains events in a JSON format. We define the Kafka configuration settings, the format and how we want to map that to a schema and also how we want watermarks to be derived from the data. In addition to JSON, Flink SQL comes with built-in support for Avro formats and it’s also possible to extend this with custom formats. One interesting fact is that Flink SQL has always supported dealing with nested data in JSON and Avro schemas.
Now that we discussed the configuration and format of our source table, let’s bring up the Docker containers of the SQL training.


git clone https://github.com/aljoscha/sql-demo
cd sql-demo
docker-compose up -d
docker-compose exec sql-client ./sql-client.sh

Windows users, please have a look at the more detailed instructions of the training.From the Flink SQL command-line client we can list our defined tables:


Flink SQL> SHOW TABLES;
TaxiRides
TaxiRides_Avro

And we can also inspect the schema of any Table:


Flink SQL> DESCRIBE TaxiRides;
root
|-- rideId: Long
|-- rowTime: TimeIndicatorTypeInfo(rowtime)
|-- isStart: Boolean
|-- lon: Float
|-- lat: Float
|-- taxiId: Long
|-- driverId: Long
|-- psgCnt: Integer

With this out of the way, let’s see what we can do with our tables.Refer to the documentation for more information about configuring Flink SQL and about defining sources, sinks, and their formats.

Massaging Data

One of the easiest things you might want to do is to get your data into the right format for further processing. This might include:

  • Converting between schemas, for example transforming a stream of JSON events to Avro encoding

  • Removing fields, or projecting them, in SQL parlance

  • Filtering out whole events that we’re not interested in

Let’s see how we would do any of those, starting with schema conversion. When we want to read data from Kafka, convert data to a different format, and write data back to a different Kafka topic for downstream processing all we have to do is to define our source table (as we have done above) and then to define a table as a sink that has a different format:


tables:
- name: TaxiRides_Avro
type: sink
update-mode: append
schema:
- name: rideId
type: LONG
- name: rowTime
type: TIMESTAMP
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: taxiId
type: LONG
- name: driverId
type: LONG
- name: psgCnt
type: INT
connector:
property-version: 1
type: kafka
version: 0.11
topic: TaxiRides_Avro
properties:
- key: zookeeper.connect
value: zookeeper:2181
- key: bootstrap.servers
value: kafka:9092
- key: group.id
value: trainingGroup
format:
property-version: 1
type: avro
avro-schema: >
{
"type": "record",
"name": "test",
"fields" : [
{"name": "rideId", "type": "long"},
{"name": "rowTime", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "isStart", "type": "boolean"},
{"name": "lon", "type": "float"},
{"name": "lat", "type": "float"},
{"name": "taxiId", "type": "long"},
{"name": "driverId", "type": "long"},
{"name": "psgCnt", "type": "int"}
]
}

With both our source and sink defined converting the data becomes as easy as:


Flink SQL> INSERT INTO TaxiRides_Avro SELECT * FROM TaxiRides;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: ffa9109b9cad077ec83137f55ec6d1c5
Web interface: http://jobmanager:8081

Our query is submitted to the Flink cluster as a standing query. You can monitor and control the query from Flink’s WebUI by accessing http://localhost:8081. We can build on this simple pattern by also introducing projection and filtering. If we only want to have certain fields in our result, we can specify that in the `SELECT` query. For example:


Flink SQL> INSERT INTO TaxiRides_Avro SELECT rideIdId, taxiId, driverId FROM TaxiRides;

This would only give us the IDs in the events. (Keep in mind that the format of the sink needs to be adapted for this query to work.)

Another simple thing we can do based on this is filtering out entire events. Consider the case that we are only interested in taxi rides that happen in New York. The events have `lon` and `lat` fields that give the longitude and latitude, respectively, at which the event happened. We can use those to determine whether the event happened in New York or not:


Flink SQL> SELECT * FROM TaxiRides WHERE isInNYC(lon, lat);

You will notice one interesting thing here, and that is `isInNYC()`. This is a user-defined function, or UDF, that we defined in our SQL client configuration. We can see which user functions we have available via:


Flink SQL> SHOW FUNCTIONS;
timeDiff
toCoords
isInNYC
toAreaId

Just like everything else they are configured in the Flink SQL client configuration file:


functions:
- name: timeDiff
from: class
class: com.dataartisans.udfs.TimeDiff
- name: isInNYC
from: class
class: com.dataartisans.udfs.IsInNYC
- name: toAreaId
from: class
class: com.dataartisans.udfs.ToAreaId
- name: toCoords
from: class
class: com.dataartisans.udfs.ToCoords

UDFs are Java classes that implement a specific interface and are registered with the client. There are different types of user functions: scalar functions, table functions, and aggregation functions. Stay tuned for a follow-up blog post that goes into a bit more detail about user-defined functions but you can also check out the UDF documentation right now.

Structuring Queries using Views in Flink SQL

Once we have sufficiently complex SQL queries, they can become a bit hard to understand. We can mitigate that by defining views in Flink SQL. This is similar to how you define variables in a programming languages to give a name to something so that you are able to reuse it later. Let’s say we want to build on the earlier examples and create a view of rides that happened in New York after a given date. We would do it like this:


Flink SQL> CREATE VIEW TaxiRides_NYC AS SELECT * FROM TaxiRides
WHERE isInNYC(lon, lat)
AND rowTime >= TIMESTAMP '2013-01-01 00:00:00';
[INFO] View has been created.

We can figure out what views we have available via:


Flink SQL> SHOW TABLES;
TaxiRides
TaxiRides_Avro
TaxiRides_NYC

One thing to note is that creating views does not actually instantiate any standing query or produce any output or intermediate results. A view is simply a logical name of a query that can be reused and allows for better structuring of queries. This is different from some other SQL-like streaming systems where each intermediate query creates data and gobbles up resources.Views are an upcoming feature of Flink 1.7 but it is already implemented and merged into the master branch, that’s why we’re already mentioning it here. Plus, it’s super helpful.

Windowed Aggregations with Event Time Support

As a last step, we want to show a more complex query that brings together what we explained so far. Consider a case where we want to monitor the rides that are happening and need to know when the number of started rides in a given area in New York exceeds a threshold (say 5). This is the query for doing so:


SELECT
toAreaId(lon, lat) AS area,
TUMBLE_END(rowTime, INTERVAL '5' MINUTE) AS t,
COUNT(*) AS c
FROM TaxiRides_NYC
WHERE isStart = TRUE
GROUP BY
toAreaId(lon, lat),
TUMBLE(rowTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;

In the example above we do the following:

  1. We use our previously created view that contains New York events that happened after a specific date,

  2. We filter out those events that are not “start events”,

  3. We use another user-defined function to convert the `lon, lat` pair to an area id and group by that,

  4. We specify that we want to have five-minute windows, and finally

  5. We filter out those windows where the count is less than five.

In a real-world use case, we would now write this to an Elasticsearch sink and power a dashboard or notification system with it. This is left as an exercise for the reader. (Hint: the Apache Flink SQL Training does go into details about this in one of the later exercises.)

Conclusion

In this blog post, we explained how Flink SQL can be used to implement simple data transformations and data massaging jobs without writing Java code. We also explained how to use views to structure more complicated queries and make them understandable. Finally, we developed a more sophisticated query that combined user-defined functions, windowed aggregations, and support for event time.

In follow-up posts, we will give more insights into how user-defined functions can be developed and used, and we will dive into Flink SQL’s powerful joins and how they can be used to enrich data. Future posts scheduled after the Flink 1.7.0 release will introduce powerful new additions for data enrichment, complex event processing, and pattern detection using Flink SQL.We encourage you to subscribe to the Apache Flink mailing list below or contact Ververica for questions regarding Flink SQL.