Build an ETL pipeline for CDC events (model, redact, enrich)

Change data capture (CDC) enables you to replicate changes from transactional databases to your data lake and data warehouse. Once data has been replicated it needs to be modeled, cleaned and transformed to meet business needs. In this tutorial you will learn how to clean, redact and model a CDC table managed in Upsolver SQLake. You will then write the transformed table into the data lake for analysis.

(Need a primer on the basics? Check out our Guide to Change Data Capture.)

Run it in two easy steps!
1. Configuring PostgreSQL for CDC replication
2. Transform the replicated data and load results to the data lake

Configuring PostgreSQL for CDC replication 

The first thing you need to do is enable CDC on your source database. Then you can configure SQLake to listen to change events and replicate them to your data lake. Follow the steps in the tutorial How to replicate a PostgreSQL table to your data lake to get started. After you complete the tutorial, continue to the next step below.

Note: After configuring your source database, an easy way to follow along with this tutorial would be to launch the Replicating PostgreSQL Tables to the Data Lake template in SQLake. After you create the ingestion job, you can copy, paste and execute the code below.

Transform the replicated data and load results to the data lake

After connecting SQLake to your database, the ingestion job will continuous update a staging table with the latest change events. The staging table is a wide, denormalized table that holds data from all of the source tables you chose to replicate. Oftentimes, it’s beneficial to clean and prepare the source data to fit the needs of analysts. For example, an e-commerce application will use a table for customer payment information. This detail is needed by the application, but certain columns, like credit card numbers, are personal data not required for the purpose of analytics. In this step, you prepare the data for analytics by renaming columns, updating timestamps to reflect local timezone, redacting sensitive information, and adding computed columns to enrich the data, and more.

To demonstrate this, you will read and transform the creditcard table from the staging table. The data will be modeled and masked and then written to the data lake for analysis. SQLake will automatically merge inserts, updates and deletes to ensure that the source and targets are in sync.

Start by creating a target table in the data lake to hold the transformed data. The syntax below includes a PRIMARY KEY clause which holds the value of the primary key from the source table. This key will be mapped to credit_card_id and used to merge inserts, updates and deletes on the output table to keep it consistent with the source.

CREATE TABLE default_glue_catalog.database_11f174.sales_cc (
  credit_card_id bigint
)
PRIMARY KEY credit_card_id;

Next, create a job that will run every 1 minute and insert, update, or delete rows into the output table for each CDC event that has been ingested. The primary_key column is used to identify similar rows between the staging table and the output table. The merge logic is as follows:

  1. WHEN MATCHED AND is_delete deletes the record in the output table when deleted in the source.  
  2. WHEN MATCHED THEN REPLACE updates the target table when the primary key matches.
  3. WHEN NOT MATCHED THEN INSERT inserts the new record into the output table when the primary key does not match.

Now you’re ready to build the job that transforms the data. You’ll perform the following:

  • Rename columns to be more descriptive
  • Convert Epoch time into a valid date/time format
  • Mask sensitive information such as credit card numbers
CREATE SYNC JOB sync_pg_creditcard_demo
  START_FROM = BEGINNING
  ADD_MISSING_COLUMNS = TRUE
  RUN_INTERVAL = 1 MINUTE
AS MERGE INTO default_glue_catalog.database_11f174.sales_cc AS target         
  USING (
    SELECT
      masked_cc AS credit_card_number,
      cardtype AS credit_card_type,
      expmonth AS expiration_month,
      expyear AS expiration_year,
      FROM_UNIXTIME(modifieddate/1000000) AS modified_timestamp,
      $primary_key AS credit_card_id,
      $is_delete AS is_delete
    FROM default_glue_catalog.database_11f174.postgres_raw_data
    LET last_4 = SUBSTRING(cardnumber,-4,4),
        first_4 = SUBSTRING(cardnumber,1,4),
        masked_cc = first_4 || '-xxxx-' || last_4
    WHERE $full_table_name = 'AdventureWorks.sales.creditcard' AND
          $event_time BETWEEN run_start_time() AND run_end_time()
  ) AS source
  ON (target.credit_card_id = source.credit_card_id)
  WHEN MATCHED AND is_delete THEN DELETE
  WHEN MATCHED THEN REPLACE
  WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME EXCEPT is_delete;

Execute the job and wait a few minutes for data to begin to populate the output table. Run the query below, substituting the database and table names for yours, to verify that data is available in the output table.

Summary

SQLake provides a straightforward CDC capability that allows you to easily replicate tables from your operational PostgreSQL database to the data lake and data warehouse. Before data can be presented to you data analysts, data scientists and BI users, it needs to be modeled, cleaned, secured and prepared. It is simple to do so directly in SQLake by creating a job with your SQL-based transformation logic that automatically updates the data as it is being merged into the target table. Although data preparation can be performed after the data has been loaded into the data warehouse, it is more cost-effective and secure to perform it prior to loading it into the target.

That’s it!  To try this out for yourself by launching the Replicating PostgreSQL Tables to the Data Lake template in SQLake!

Published in: Blog , Change data capture
Chandra Peddireddy
Chandra Peddireddy

Chandra Peddireddy is a Senior Solutions Architect at Upsolver. He possesses 16 years of IT experience in data architecture, data engineering, data modeling, business intelligence, and analytics. His long list of expertise includes designing and building data pipelines of varied scales involving multiple data technologies.

Keep up with the latest cloud best practices and industry trends

Get weekly insights from the technical experts at Upsolver.

Subscribe

Templates

All Templates

Explore our expert-made templates & start with the right one for you.