How-to guide: Synchronize MySQL sub-database and sub-table using Flink CDC
In the Online Transaction Processing (OLTP) system, to solve the problem of a large amount of data in a single table, the method of sub-database and table is usually used to split a single large table to improve the throughput of the system. However, to facilitate data analysis, it is generally necessary to merge the table splits from the sub-databases and sub-tables into a large table when synchronizing to the data warehouse or data lake.
This tutorial will show you how to use Flink CDC to build a real-time data lake for the above-presented scenario. The examples in this article will all be based on Docker with the use of Flink SQL. There is no need for a line of Java/Scala code or installation of an IDE. The entire content of this guide contains the docker-compose file.
The whole process will be shown by synchronizing data from MySQL to Iceberg, as shown in the diagram below.
Prerequisites
- Preferably, a clean Linux installation on bare metal or virtual machine
- Install Docker Engine or Docker Desktop
- Download flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
- Download iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar
- Download flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar
Step 1: Create a docker-compose.yml file
Create a Docker Compose file (docker-compose.yml) with the following content:
version: '2.1'
services:
sql-client:
user: flink:flink
image: yuxialuo/flink-sql-client:1.13.2.v1
depends_on:
- jobmanager
- mysql
environment:
FLINK_JOBMANAGER_HOST: jobmanager
MYSQL_HOST: mysql
volumes:
- shared-tmpfs:/tmp/iceberg
jobmanager:
user: flink:flink
image: flink:1.13.2-scala_2.11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
- shared-tmpfs:/tmp/iceberg
taskmanager:
user: flink:flink
image: flink:1.13.2-scala_2.11
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
volumes:
- shared-tmpfs:/tmp/iceberg
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
volumes:
shared-tmpfs:
driver: local
driver_opts:
type: "tmpfs"
device: "tmpfs"
The containers in this docker-compose file include:
- SQL-Client: Flink SQL Client, used to submit SQL queries and view SQL execution results
- Flink Cluster: contains Flink JobManager and Flink TaskManager to execute Flink SQL
- MySQL: as the data source of sub-database and sub-table, store the user table
Note
If you want to run this guide in your own Flink environment, you need to download the packages listed below and put them in the lib directory of the Flink directory, i.e., FLINK_HOME/lib/.
flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar
All Docker Compose-related commands used in this tutorial will need to be executed in the directory where docker-compose.yml is located.
Execute the following command where docker-compose.yml is located to start the components needed for this guide:
docker-compose up -d
This command will automatically start all containers defined in the Docker Compose file in the detached mode.
Step 2: Prepare data in the MySQL database
Enter the MySQL container
docker-compose exec mysql mysql -uroot -p123456
Create data, tables, and populate the data.
CREATE DATABASE db_1;
USE db_1;
CREATE TABLE user_1 (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234","user_110@foo.com");
CREATE TABLE user_2 (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user_2 VALUES (120,"user_120","Shanghai","123567891234","user_120@foo.com");
CREATE DATABASE db_2;
USE db_2;
CREATE TABLE user_1 (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user_1 VALUES (110,"user_110","Shanghai","123567891234", NULL);
CREATE TABLE user_2 (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255)
);
INSERT INTO user_2 VALUES (220,"user_220","Shanghai","123567891234","user_220@foo.com");
Step 3: Create tables using Flink DDL with Flink SQL CLI
Use the following command to enter the Flink SQL CLI container:
docker-compose exec sql-client ./sql-client
You will see the following interface:
Turn on the checkpoint and do a checkpoint every 3 seconds. The checkpoint is not enabled by default, and we need to enable the checkpoint to allow Iceberg to submit the transactions. Moreover, MySQL-CDC must wait for a complete checkpoint before the binlog reading phase starts to avoid out-of-order binlog records.
SET execution.checkpointing.interval = 3s;
Create a source table user_source to capture the data of all databases and tables in MySQL and use regular expressions to match these databases and tables used in the configuration items of the table. Moreover, the table also defines a metadata column to distinguish which database and table the data comes from.
CREATE TABLE user_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'db_[0-9]+',
'table-name' = 'user_[0-9]+'
);
Create a sink table all_users_sink to load data into Iceberg. In this sink table, we define a composite primary key (database_name, table_name, id) where the values of id fields may be the same.
CREATE TABLE all_users_sink (
database_name STRING,
table_name STRING,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
) WITH (
'connector'='iceberg',
'catalog-name'='iceberg_catalog',
'catalog-type'='hadoop',
'warehouse'='file:///tmp/iceberg/warehouse',
'format-version'='2'
);
Step 4: Stream to Iceberg
Use the following Flink SQL statement to write data from MySQL to Iceberg.
-- Flink SQL
INSERT INTO all_users_sink select * from user_source;
The command above will start a streaming job to continuously synchronize the full and incremental data in the MySQL database to Iceberg. You can see this running job in the Flink UI:
Use the following command to see the written files in Iceberg.
docker-compose exec sql-client tree /tmp/iceberg/warehouse/default_database/
Note
In your environment, the actual files may differ from the screenshot above, but the overall directory structure should be similar.
Use the following Flink SQL statement to query the data in all_users_sink.
SELECT * FROM all_users_sink;
We can see the following query results in the Flink SQL CLI.
Insert a new row into the db_1.user_1 table.
INSERT INTO db_1.user_1 VALUES (111,"user_111","Shanghai","123567891234","user_111@foo.com");
Update db_1.user_2 table data.
UPDATE db_1.user_2 SET address='Beijing' WHERE id=120;
Delete a row in the db_2.user_2 table.
DELETE FROM db_2.user_2 WHERE id=220;
The final query results are as follows.
From the latest results of Iceberg, you can see that the new record has been added, the address has been updated, and the old record has been deleted, which is exactly the same as the data update we made in MySQL.
Step 5: Clean the environment
You can clean the environment by executing the following command in the directory where the docker-compose.yml file is located to stop all containers.
docker-compose down