Stream Processing & Apache Flink - News and Best Practices

Flink SQL: Deduplication

Written by Ververica | 30 December 2022

Flink SQL has emerged as the de facto standard for low-code data analytics. It has managed to unify batch and stream processing while simultaneously staying true to the SQL standard. In addition, it provides a rich set of advanced features for real-time use cases. In a nutshell, Flink SQL provides the best of both worlds: it gives you the ability to process streaming data using SQL, but it also supports batch processing.

Ververica Platform makes Flink SQL even more accessible and efficiently scalable across teams. The platform comes with additional tools for developing SQL scripts, managing user-defined functions (UDFs), catalogs and connectors, as well as operating the resulting long-running queries.

We have seen that there are many use cases for Flink SQL, and we are excited to see what you will build with it. In this blog post, we will explain what deduplication is and show how you can accomplish this with Flink SQL.


What is deduplication in stream processing?

Deduplication is a process of removing duplicate data from a dataset. This is usually done to improve the quality of the data. In stream processing, data deduplication is a very important process because it can help improve the performance of the system.

Data deduplication works by identifying and removing duplicate records from the data stream. This is usually done by comparing the data in the stream to a reference dataset. When a duplicate record is found, it is removed from the stream. 

Benefits of deduplication

There are many benefits to deduplicating data, including:

  • improved performance – by removing duplicate data, you can reduce the amount of data that needs to be processed, which can improve performance
  • reduced storage requirements – duplicate data takes up unnecessary space, so removing it can free up valuable storage space
  • higher accuracy – duplicate data can lead to inaccurate results, so removing it can improve the accuracy of your data analysis
  • boosted efficiency – deduplicating data can make data processing more efficient by reducing the amount of data that needs to be processed

How to deduplicate data with Flink SQL

There are different ways that duplicate events can end up in your data sources, from human error to application bugs. Regardless of the origin, unclean data can have a real impact on the quality (and correctness) of your results.

In some cases, data producers generate records with the same ID for streaming data changes. These records may include Insert, Update, and Delete records, and they may need to be deduplicated as part of the business logic in the pipeline before being aggregated or joined with other streams. The purpose of deduplication in this context is to ensure that only unique records are processed and to avoid any issues that may arise from duplicate data.

Suppose that your order system occasionally generates duplicate events with the same order_id, but you're only interested in keeping the most recent event for downstream processing.

As a first step, you can use a combination of the COUNT function and the HAVING clause to check if and which orders have more than one event, and then filter out these events using ROW_NUMBER(). In practice, deduplication is a special case of Top-N aggregation, where N is 1 (rownum = 1) and the ordering column is either the processing or event time of events. 

In the example query below, the source table orders is backed by the built-in datagen connector, which continuously generates rows in memory.


CREATE TABLE orders ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); --Check for duplicates in the `orders` table SELECT id AS order_id, COUNT(*) AS order_cnt FROM orders o GROUP BY id HAVING COUNT(*) > 1; --Use deduplication to keep only the latest record for each `order_id` SELECT order_id, order_time FROM ( SELECT id AS order_id, order_time, ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum FROM orders ) WHERE rownum = 1;

Summary

In this article, you learned about the deduplication of data. You've also seen how to use Flink SQL to write queries for this type of problem.

We encourage you to run these examples on Ververica Platform. You can follow these simple steps to install the platform.

To learn more about Flink SQL, check out the following resources: