Stream Processing & Apache Flink - News and Best Practices

Flink SQL Recipe: Window Top-N and Continuous Top-N

Written by Ververica | 25 November 2022

Flink SQL has emerged as the standard for low-code streaming analytics and managed to unify batch and stream processing while simultaneously staying true to the SQL standard. In addition, it provides a rich set of advanced features for real-time data analysis. In a nutshell, Flink SQL is the best of both worlds: it gives you the ability to process streaming data using SQL, but it also supports batch processing.

Ververica Platform makes Flink SQL even more accessible and efficiently scalable across teams. The platform comes with additional tools for developing SQL scripts, managing user-defined functions (UDFs), catalogs and connectors, as well as operating resulting long-running queries.

We have seen many use cases for Flink SQL, and we are excited to show you what you can build with it. In this series of blog posts, we will explore how to use Flink SQL to process data in a variety of ways. This post, in particular, will focus on two queries: Window Top-N and Continuous Top-N.

Tip: Visit our Case Study to explore how others are using Apache Flink.

What are Window Top-N and Continuous Top-N queries?

Window Top-N and Continuous Top-N are two similar but slightly different ways of processing data. In both cases, we want to find the top N items in a stream of data but there are some key differences:

  • In Window Top-N, we process data in fixed-size windows. For example, we might want to find the top 10 items every minute.
  • In Continuous Top-N, we process data continuously. We don't use windows, but instead process data as it arrives.

Continuous Top-N is more difficult to implement than Window Top-N, but it has some advantages. For example, it can give us results more quickly, since we don't have to wait for a window to close before we can see the results.

Common use cases for Window Top-N and Continuous Top-N queries

Window Top-N and Continuous Top-N queries are both useful for a variety of tasks. For example, they can be used for:

  • Fraud detection: In a stream of financial transactions, we might want to find the top 10 transactions by amount every minute. What can it help us with? - identify suspicious activity.
  • Recommendations: In a stream of user interactions, we might want to find the top 10 items that are being viewed or purchased. What can it help us with? - make recommendations to users.
  • Anomaly detection: In a stream of sensor readings, we might want to find the top 10 sensors with the highest readings. What can it help us with? - identify sensors that are malfunctioning
  • Monitoring: In a stream of log messages, we might want to find the top 10 log messages by volume. What can it help us with? - identify system issues.

How to use Flink SQL to write Window Top-N queries

Let's start by looking at how to use Flink SQL to write Window Top-N queries. We'll show you how to calculate the Top 3 suppliers who have the highest sales for every tumbling 5 minutes window.


CREATE TABLE orders (
  bidtime TIMESTAMP(3),
  price DOUBLE,
  item STRING,
  supplier STRING,
  WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECONDS
) WITH (
 'connector' = 'faker',
 'fields.bidtime.expression' = '#{date.past ''30'',''SECONDS''}',
 'fields.price.expression' = '#{Number.randomDouble ''2'',''1'',''150''}',
 'fields.item.expression' = '#{Commerce.productName}',
 'fields.supplier.expression' = '#{regexify ''(Alice|Bob|Carol|Alex|Joe|James|Jane|Jack)''}',
 'rows-per-second' = '100'
);

SELECT *
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM (
      SELECT window_start, window_end, supplier, SUM(price) as price, COUNT(*) as cnt
      FROM TABLE(
        TUMBLE(TABLE orders, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
      GROUP BY window_start, window_end, supplier
    )
  ) WHERE rownum <= 3;

The source table (orders) is backed by the faker connector, which continuously generates rows in memory based on Java Faker expressions.

 

Note : This example leverages the Window Top-N feature to display the top 3 suppliers with the highest sales every 5 minutes.

How to use Flink SQL to write Continuous Top-N queries

Writing Continuous Top-N queries is more difficult than writing Window Top-N queries. The reason for this is that, in Continuous Top-N, we process data as it arrives instead of using windows.

.This example will take us into the realm of magic as stream processing is often considered to be by the uninitiated. However, it is really just a set of instructions to be executed on a stream of data. We will show how to continuously calculate the "Top-N" rows based on a given attribute, using an OVER window and the ROW_NUMBER() function.

The source table (spells_cast) is backed by the faker connector, which continuously generates rows in memory based on Java Faker expressions.

The Ministry of Magic tracks every spell a wizard casts throughout Great Britain and wants to know every wizard's Top 2 all-time favorite spells.

Flink SQL can be used to calculate continuous aggregations, so if we know each spell a wizard has cast, we can maintain a continuous total of how many times they have cast that spell.


SELECT wizard, spell, COUNT(*) AS times_cast
FROM spells_cast
GROUP BY wizard, spell;

This result can be used in an OVER window to calculate a Top-N. The rows are partitioned using the wizard column, and are then ordered based on the count of spell casts (times_cast DESC). The built-in function ROW_NUMBER() assigns a unique, sequential number to each row, starting from one, according to the rows' ordering within the partition. Finally, the results are filtered for only those rows with a row_num <= 2 to find each wizard's top 2 favorite spells.

Where Flink is most potent in this query is its ability to issue retractions. As wizards cast more spells, their top 2 will change. When this occurs, Flink will issue a retraction, modifying its output, so the result is always correct and up to date.


CREATE TABLE spells_cast (
  wizard STRING,
  spell STRING
) WITH (
 'connector' = 'faker',
 'fields.wizard.expression' = '#{harry_potter.characters}',
 'fields.spell.expression' = '#{harry_potter.spells}'
);

SELECT wizard, spell, times_cast
FROM (
  SELECT *,
  ROW_NUMBER() OVER (PARTITION BY wizard ORDER BY times_cast DESC) AS row_num
  FROM (SELECT wizard, spell, COUNT(*) AS times_cast FROM spells_cast GROUP BY wizard, spell)
)
WHERE row_num <= 2; 

Conclusion

In this article, you've learned about Window Top-N and Continuous Top-N. You've also seen how to use Flink SQL to write queries for both of these types of problems.

We strongly encourage you to run these examples in Ververica Platform, just follow these simple steps to install the platform.

If you're interested in learning more about Flink SQL, we recommend the following resources: