Stream Processing & Apache Flink - News and Best Practices

Flink SQL: Joins Series 3 (Lateral Joins, LAG aggregate function)

Written by Ververica | 23 December 2022

Flink SQL has emerged as the de facto standard for low-code data analytics. It has managed to unify batch and stream processing while simultaneously staying true to the SQL standard. In addition, it provides a rich set of advanced features for real-time use cases. In a nutshell, Flink SQL provides the best of both worlds: it gives you the ability to process streaming data using SQL, but it also supports batch processing.

Ververica Platform makes Flink SQL even more accessible and efficiently scalable across teams. The platform comes with additional tools for developing SQL scripts, managing user-defined functions (UDFs), catalogs and connectors, as well as operating the resulting long-running queries.

We have seen that there are many use cases for Flink SQL, and we are excited to see what you will build with it. In this three-part series of blog posts, we will show you different types of joins in Flink SQL and how you can use them to process data in a variety of ways.


What is a Lateral Join?

Lateral joins are a type of SQL join that allow you to specify a subquery in the FROM clause. This subquery is then executed for each row in the outer query. Lateral joins can be used to improve the performance of SQL queries by reducing the number of table scans.

In other words, you can think of lateral join as a foreach loop in SQL that iterates through a collection, applies some transformation on each iteration, and produces an output. Lateral join is very useful in processing data that is stored in a hierarchical or nested format.

How to perform a lateral table join

This example will show how you can correlate events using a LATERAL join.

Given a table with people's addresses, you need to find the two most populous cities for each state and continuously update those rankings as people move. The input table of People contains a uid for each person and their address and when they moved there.

The first step is to calculate each city's population using a continuous aggregation. While this is simple enough, the real power of Flink SQL comes when people move. By using deduplication Flink will automatically issue a retraction for a person's old city when they move. So if John moves from New York to Los Angeles, the population for New York will automatically go down by 1. This gives us the power of Change-Data-Capture without having to invest in the actual infrastructure of setting it up!

With this dynamic population table at hand, you are ready to solve the original problem using a LATERAL table join. Unlike a normal join, lateral joins allow the subquery to correlate with columns from other arguments in the FROM clause. And unlike a regular subquery, as a join, the lateral can return multiple rows. You can now have a subquery correlated with every individual state, and for every state it ranks by population and returns the top 2 cities.


CREATE TABLE People (
    id           INT,
    city         STRING,
    state        STRING,
    arrival_time TIMESTAMP(3),
    WATERMARK FOR arrival_time AS arrival_time - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'faker',
    'fields.id.expression'    = '#{number.numberBetween ''1'',''100''}',
    'fields.city.expression'  = '#{regexify ''(Newmouth|Newburgh|Portport|Southfort|Springfield){1}''}',
    'fields.state.expression' = '#{regexify ''(New York|Illinois|California|Washington){1}''}',
    'fields.arrival_time.expression' = '#{date.past ''15'',''SECONDS''}',
    'rows-per-second'          = '10'
);

CREATE TEMPORARY VIEW CurrentPopulation AS
SELECT
    city,
    state,
    COUNT(*) as population
FROM (
    SELECT
        city,
        state,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum
    FROM People
)
WHERE rownum = 1
GROUP BY city, state;

SELECT
    state,
    city,
    population
FROM
    (SELECT DISTINCT state FROM CurrentPopulation) States,
    LATERAL (
        SELECT city, population
        FROM CurrentPopulation
        WHERE state = States.state
        ORDER BY population DESC
        LIMIT 2
    );

What is a LAG() function?

LAG(column_name, offset) is a function that is used to access data from a previous row in the same table.

This function is useful for comparisons where you want to compare values in the current row with values in a previous row. For example, you might want to find out how much a given stock price has increased or decreased over time.

LAG() is easy to use. Simply specify the column you want to access data from and the number of rows back you want to go. For example, LAG(sales, 1) would return the sales data from the previous row.

How to retrieve a previous row value with the LAG() function

This example will demonstrate how to retrieve the previous value and compute trends for a specific data partition.

The source table (fake_stocks) is backed by the faker connector, which continuously generates fake stock quotations in memory based on Java Faker expressions.

In this example, you are going to create a table which contains stock ticker updates for which we want to determine if the new stock price has gone up or down compared to its previous value.

First, create the table, then use a select statement including the LAG function to retrieve the previous stock value. Finally, use the case statement in the final select to compare the current stock price against the previous value to determine the trend.



CREATE TABLE fake_stocks (
    stock_name STRING,
    stock_value double,
    log_time AS PROCTIME()
) WITH (
  'connector' = 'faker',
  'fields.stock_name.expression' = '#{regexify ''(Deja\ Brew|Jurassic\ Pork|Lawn\ \&\ Order|Pita\ Pan|Bread\ Pitt|Indiana\ Jeans|Thai\ Tanic){1}''}',
  'fields.stock_value.expression' =  '#{number.randomDouble ''2'',''10'',''20''}',
  'fields.log_time.expression' =  '#{date.past ''15'',''5'',''SECONDS''}',
  'rows-per-second' = '10'
);

WITH current_and_previous as (
    select
        stock_name,
        log_time,
        stock_value,
        lag(stock_value, 1) over (partition by stock_name order by log_time) previous_value
    from fake_stocks
)
select *,
    case
        when stock_value > previous_value then '▲'
        when stock_value < previous_value then '▼'
        else '='
    end as trend
from current_and_previous;

Summary

In this article, you learned about a lateral table join and a LAG() function. You've also seen how to use Flink SQL to write queries for both of these types of scenarios.

We encourage you to run these examples on Ververica Platform. You can follow these simple steps to install the platform.

To learn more about Flink SQL, check out the following resources: