How to Build a MySQL CDC Pipeline in Minutes

Change data capture (CDC) enables you to replicate changes from transactional databases to your data lake and data warehouse. In this tutorial you will learn how to configure your MySQL database to enable CDC. You will also learn how to build a simple data pipeline to copy changed events from the MySQL to your data lake, automatically inserting, updating or deleting rows to match the original table.

If you need a refresher, check out the definition and basics of CDC in our previous article.

Run it!
1. Configure MySQL for CDC replication
2. Create a connection to your MySQL database
3. Staging source tables in the data lake
4. Merge changed events from staging to your target table

Configure MySQL for CDC replication

Create user with required permissions

Start by creating a user and assigning them the required permissions to manage the CDC process as shown below. Substitute the username and password for your own.

CREATE USER 'cdcuser'@'%' IDENTIFIED BY 'password';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdcuser' IDENTIFIED BY 'password';

FLUSH PRIVILEGES;

If you are using Amazon RDS or Amazon Aurora for MySQL, they don’t allow a global read lock. You will need to configure a table-level lock to create a consistent snapshot. In this case, you need to also grant LOCK TABLES permission to the CDC user.

GRANT LOCK TABLES ON *.* TO 'cdcuser' IDENTIFIED BY 'password';

Enable logical replication

MySQL uses a binlog to track changes to databases and tables. SQLake uses this binlog to replicate changes to your data lake and data warehouse. To enable the binlog, you need to update your MySQL server configuration file by including the following properties:

binlog_format     =      ROW

binlog_row_image  =      FULL

Or execute the following statement:

SET GLOBAL binlog_format = 'ROW';
SET GLOBAL binlog_row_image = 'FULL';

To check if the binlog is correctly configured, run the following query against you database:

show global variables where variable_name in ('binlog_format', 'binlog_row_image');

For Amazon RDS, you need to configure these properties in the parameter group of your database.  Once set, you will need to restart the database for it to take effect. If your RDS database is using the default param group, you cannot modify it. In that case, create a new parameter group for your database version. This inherits all the default params from the default group automatically. And then modify these two properties.

After you make the changes, make sure the parameter group is set and the database is updated.

show global variables where variable_name in ('binlog_format', 'binlog_row_image');

For more detailed information about configure MySQL CDC replication, follow the instructions in the Debezium documentation.

Create a connection to your MySQL database

Follow along by launching the Replicating MySQL Tables to the Data Lake template in SQLake!

Connections in SQLake contain the endpoints, and credentials needed to connect to your source system.  Connections are typically created once, and then used for any necessary connectivity to that system.  The code below can be run (modifying the necessary placeholders) to create a connection to a MySQL instance.

CREATE MYSQL CONNECTION upsolver_mysql_cdc
   CONNECTION_STRING = 'jdbc:mysql://<hostname>:3306/'
   USER_NAME = 'cdcuser'
   PASSWORD = 'password';

Staging source tables in the data lake

Staging tables are used in SQLake to store raw CDC events for all replicated tables. SQLake maintains all of the replicated tables in a single append-only, wide table. This table includes a system column $full_table_name that can be used to partition the staging table. When there are many source tables, this improves the performance of filtering and retrieving events for selected tables.

CREATE TABLE default_glue_catalog.database_19ae92.mysql_raw_data(
  $full_table_name string
)
PARTITIONED BY $full_table_name;

Next, create an ingestion job to copy the source change events into the staging table. The TABLE_INCLUDE_LIST should list each table that you wish to replicate from your source MySQL system.  The COLUMN_EXCLUDE_LIST can be used if you need to ignore certain columns from being replicated, like those containing PII or PHI.

CREATE SYNC JOB load_raw_data_from_mysql
AS COPY FROM MYSQL upsolver_mysql_cdc
   TABLE_INCLUDE_LIST = ('classicmodels.offices','classicmodels.employees')
INTO default_glue_catalog.database_19ae92.mysql_raw_data;

Once the job starts to run, SQLake will take a snapshot of each of the tables you selected to replicate. After SQLake has finished copying the historical data using a snapshot it will start streaming CDC events. You can monitor the status of the snapshot and streaming process using the job properties window from your connection list, as shown in the screenshot below:

The screenshot above shows that the customers table has completed the snapshot and is already streaming, the offices table is just about to start the snapshot, and the employees table is pending the start of the snapshot process.

SQLake adds several automatically generated system columns when it creates the staging table. You can get the full list in the System Columns documentation

To verify the status of your raw staging table, run the following query from your SQLake worksheet.

SELECT 
  "$full_table_name", 
  "$table_name", 
  "$operation", 
  "$is_delete", 
  count(*) as count
FROM default_glue_catalog.database_19ae92.mysql_raw_data
GROUP BY 1,2,3,4
ORDER BY 1;

The results should look similar to the following screenshot:

After the initial snapshot of the tables completes and the historical data has been fully synchronized, SQLake will begin to process change events. Query the staging table again and you will see the $operation column now includes change events as rows are inserted and updated in the source database.

SELECT 
  "$table_name", 
  "$operation", 
  "$is_delete", 
  count(*) as count
FROM default_glue_catalog.database_19ae92.mysql_raw_data
GROUP BY 1,2,3,4 
ORDER BY 1;

Merge changed events from staging to your target table

Your staging table contains a raw copy of all the tables you selected to replicate. The dataset includes rows from the initial snapshot and all change events since. In this step, you will create a table to hold the results after changes have been merged. The syntax below includes the creation of a primary_key which will be a concatenation of all primary keys in the source table. This composite key is used to merge inserts, updates and deletes on the output table to keep it consistent with the source. 

You’ll need to create an output table per source table you want to expose. In this example, you’ll create the offices table.

CREATE TABLE default_glue_catalog.database_19ae92.offices(
  primary_key string
)
PRIMARY KEY primary_key;

Next, create a job that will run every 1 minute and insert, update, or delete rows into the output table for each CDC event that has been ingested. The primary_key column is used to identify similar rows between the staging table and the output table. The logic is simple:

  1. WHEN MATCHED AND is_delete deletes the record in the output table when deleted in the source.  
  2. WHEN MATCHED THEN REPLACE updates the target table when the primary key matches. 
  3. WHEN NOT MATCHED THEN INSERT inserts the new record into the output table when the primary key does not match.  

When you update this code for your environment, change the $full_table_name filter in the WHERE clause to reference the table that you are replicating.

CREATE SYNC JOB sync_classicmodels_offices
    START_FROM = BEGINNING
    RUN_INTERVAL = 1 MINUTE
    ADD_MISSING_COLUMNS = true
AS MERGE INTO default_glue_catalog.database_19ae92.offices AS target
USING (
  SELECT
    *,
    $primary_key as primary_key,
    $is_delete as is_delete
  FROM  default_glue_catalog.database_19ae92.mysql_raw_data
  WHERE "$full_table_name" = 'classicmodels.offices'
   AND "$event_time" BETWEEN RUN_START_TIME() and RUN_END_TIME()
) AS source
ON source.primary_key = target.primary_key
WHEN MATCHED AND is_delete THEN DELETE
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME EXCEPT is_delete;

Using a SELECT * allows you to map all columns from the source table into your target table. If you wish to perform any transformations, rename, remove or add computed columns, you can modify the SELECT clause based on your requirements.

Execute the job and wait a few minutes for data to begin to populate the output table. Querying the source and target tables should return similar results.

mySQL source table:

SQLake target table:

Merging changes without a primary key

In some situations, the source table may not include a primary key. This means that there is no unique identifier for the row that can be used to determine if a change is an update or a new row. In SQLake, if the source table does not have a primary key column defined then the $primary_key system column will be NULL. To address this you can map as the primary key any column containing a unique value distribution, meaning it can uniquely identify the row.

CREATE SYNC JOB sync_classicmodels_employees
    START_FROM = BEGINNING
    RUN_INTERVAL = 1 MINUTE
    ADD_MISSING_COLUMNS = true
AS MERGE INTO default_glue_catalog.database_19ae92.employees AS target
USING (
  SELECT
    *,
    employeeNumber as primary_key,
    $is_delete as is_delete
  FROM  default_glue_catalog.database_19ae92.mysql_raw_data
  WHERE "$full_table_name" = 'classicmodels.employees'
  AND "$event_time" BETWEEN RUN_START_TIME() and RUN_END_TIME()
) AS source
ON source.primary_key = target.primary_key
WHEN MATCHED AND is_delete THEN DELETE
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME EXCEPT is_delete;

Another option is to define a composite primary key. If you don’t have a single unique key in the table, you can combine several columns that allows you to uniquely identify a row.

CREATE SYNC JOB sync_classicmodels_products
    START_FROM = BEGINNING
    RUN_INTERVAL = 1 MINUTE
    ADD_MISSING_COLUMNS = true
AS MERGE INTO default_glue_catalog.database_19ae92.products AS target
USING (
  SELECT 
    *,
    productCode as key1,
    productCategory as key2,
    $is_delete as is_delete
  FROM  default_glue_catalog.database_19ae92.mysql_raw_data
  WHERE "$full_table_name" = 'classicmodels.products'
   AND "$event_time" BETWEEN RUN_START_TIME() and RUN_END_TIME()
) AS source
ON source.key1 = target.key1 AND source.key2 = target.key2
WHEN MATCHED AND is_delete THEN DELETE
WHEN MATCHED THEN REPLACE
WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME EXCEPT is_delete;

Adding more tables

Naturally, more tables are continuously being added to your source database. You can easily add them to your replication job by updating the ingestion job and adding those tables to the TABLE_INCLUDE_LIST.

ALTER JOB load_raw_data_from_mysql 
  SET TABLE_INCLUDE_LIST = (
 'classicmodels.employees',
 'classicmodels.offices', 
 'classicmodels.products'
);

After the job has been altered, SQLake will begin to snapshot the new tables. When it finishes copying the snapshots, it will begin replicating new change events.

Summary

SQLake provides a simple to use CDC capability that allows you to easily replicate tables from your operational MySQL database to the data lake and data warehouse. SQLake automatically creates an append-only wide table in the data lake to hold the full history of your source tables. It continuously appends changed events captured from the source. This makes it easy and economical to store raw data for ML model training and reprocessing to fix quality and corruptions in the data without going back to the source. You create jobs that merge the change events from the staging table into output tables that are kept in sync with the source. Furthermore, you model, prepare and enrich tables before loading the final results to your data warehouse or data lake for analysis and BI.

That’s it!  To try this out for yourself by launching the Replicating MySQL Tables to the Data Lake template in SQLake!

Published in: Blog , Change data capture
Ajay Chhawacharia
Ajay Chhawacharia

Ajay Chhawacharia possesses 20+ years of IT experience in managing and delivering full stack customer solutions, including providing technical leadership with a passion for data architecture and data engineering. As a Senior Solutions Architect at Upsolver, Ajay helps customers solve complex problems with the most efficient and cost effective solutions.

Keep up with the latest cloud best practices and industry trends

Get weekly insights from the technical experts at Upsolver.

Subscribe

Templates

All Templates

Explore our expert-made templates & start with the right one for you.