Explore our expert-made templates & start with the right one for you.
4 steps to Postgres CDC – replicate Postgres changes to your data lake
Change data capture (CDC) enables you to replicate changes from transactional databases to your data lake and data warehouse. In this tutorial you will learn how to configure your PostgreSQL database to enable CDC. You will also learn how to build a simple data pipeline to copy changed events from the PostgreSQL to your data lake, automatically inserting, updating or deleting rows to match the original table.
1. Configuring PostgreSQL for CDC replication
2. Create a connection to your PostgreSQL database
3. Staging source tables in the data lake
4. Merge changed events from staging to your target table
Configuring PostgreSQL for CDC replication
Enable logical replication:
In order to replicate changes to your PostgreSQL table into the data lake, you first need to enable logical replication. If you are unsure whether or not logical replication has been configured, you can run the following query against PostgreSQL to confirm.
If the query returns ‘logical’ as in the screenshot below, logical replication is already enabled and you can jump to step 2.
The process for enabling logical replication will vary depending on the version of PostgreSQL you are running, and the platform that you are running it on, like Amazon RDS, Amazon Aurora or self-hosted.
For self-hosted PostgreSQL, you can enable replication by running the following command:
ALTER SYSTEM SET wal_level = logical;
If your PostgreSQL is hosted on Amazon RDS, you can enable the rds.logical_replication by setting it in the parameter group used by your database. Once set, you will need to restart the database for it to take effect.
After enabling replication, you should again execute the SHOW wal_level command and ensure that it returns ‘logical’ as shown in the earlier screenshot.
Create user with required permissions:
Using Amazon RDS as the example, create a new privileged user (if you are planning to use an existing user, skip this step):
CREATE ROLE <name> WITH LOGIN PASSWORD ‘<your password>’;
Once the user is created, you can grant it the rds_replication role:
--GRANT permissions to query replication GRANT rds_replication TO <name>; --GRANT permissions to SELECT from all tables, or specific ones GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <name>; -- All tables GRANT SELECT ON <table_name> TO <user>; -- Specific tables
If you are running a self-hosted instance please refer to Debezium’s documentation for configuration instructions. SQLake runs the open source Debezium engine for CDC. Unlike the open source version, with SQLake you do not need Kafka Connect.
Create a publication for Upsolver to use:
Once logical replication is enabled for your PostgreSQL database, you will need to create a publication that contains the tables you wish to replicate. You can create a publication for all tables by executing the following command:
CREATE PUBLICATION upsolver-publication FOR ALL TABLES
If you do not want to include all tables in your publication, you can execute the following command and include specific tables to publish.
CREATE PUBLICATION upsolver-publication FOR TABLE <table1>, <table2>;
Creating a publication on specific tables will not automatically include newly created tables. For instructions on how to create and manage publication for specific tables, see the PostgreSQL documentation.
If the database contains tables that do not have primary keys, adding those tables to the publication will cause PostgreSQL to block all updates and deletes for that table. This can be prevented in one of the following ways:
- Only include tables with primary keys in your publication.
- Change the REPLICA IDENTITY of that table to FULL or INDEX. More information about this can be found in PostgreSQL documentation.
- Add a primary key to the table.
Set up a Heartbeat Table:
PostgreSQL uses a WAL (Write Ahead Log) to track changes, such inserts, updates and deletes to tables, across all databases in a given PostgreSQL server. Clients use these replication slots to capture and read change events. In order for PostgreSQL to know that a given change event has been received by all, each client must acknowledge the change event was successfully received. PostgreSQL must wait until all client replication slots are acknowledged before the slots can be deleted. Consequently, if a client does not acknowledge any change events for a long period of time, PostgreSQL will retain the changed event. Over time, this can result in the WAL growing too large for the available storage on the server. This can lead to your database running out of storage space and potentially crashing.
SQLake is only able to acknowledge a change event when it receives it. This means that if you replicate tables that don’t change very often, SQLake may not get an opportunity to acknowledge any events, resulting in PostgreSQL retaining the replication slot and causing an increase in disk utilization.
To prevent this situation, a Heartbeat Table can be used to insert heartbeats into the replication stream so that SQLake can more frequently acknowledge change events. Follow these steps in PostgreSQL to create a heartbeat table.
Create a heartbeat table in the <schema> and <table> of your choice:
CREATE TABLE IF NOT EXISTS <schema_name>.<table_name> (key int primary key, value timestamp);
Grant the necessary permissions for the user you created earlier to be able to use the heartbeat table:
GRANT INSERT, UPDATE, SELECT on table <table_name> to <user>;
If the publication used in the data source was created for specific tables (not all tables), add the heartbeat table to the publication:
ALTER PUBLICATION upsolver-publication ADD TABLE <heartbeat_table>
Now that you finished configuring PostgreSQL, let’s move to more fun things, like creating the data pipeline.
Create a connection to your PostgreSQL database
Follow along by launching the Replicating PostgreSQL Tables to the Data Lake template in SQLake!
Connections in SQLake contain the endpoints and credentials needed to connect to your source system. Connections are typically created once, and then used for any necessary connectivity to that system. Run the code below (modifying the necessary placeholders) to create a connection to a PostgreSQL instance.
CREATE POSTGRES CONNECTION upsolver_postgres CONNECTION_STRING = 'jdbc:postgresql://<hostname>:5432/<dababase_name>' USER_NAME = '<username>' PASSWORD = '<password>';
Staging source tables in the data lake
Staging tables are used in SQLake to store raw CDC events for all replicated tables. SQLake maintains all of the replicated tables in a single append-only, wide table. This table includes a system column $full_table_name that can be used to partition the staging table. When there are many source tables, this improves the performance of filtering and retrieving events for selected tables.
CREATE TABLE <catalog>.<database>.postgres_raw_data( $full_table_name string ) PARTITIONED BY $full_table_name;
Next, create an ingestion job to copy the source change events into the staging table. This is where you’ll be using the PUBLICATION_NAME and HEARTBEAT_TABLE you created in steps 1.3 and 1.4. The TABLE_INCLUDE_LIST should list each table that you wish to replicate from your source PostgreSQL system. The COLUMN_EXCLUDE_LIST can be used if you need to ignore certain columns from being replicated, like those containing PII or PHI.
CREATE SYNC JOB load_raw_data_from_postgres PUBLICATION_NAME = 'upsolver-publication' HEARTBEAT_TABLE = 'upsolver.heartbeat' AS COPY FROM POSTGRES upsolver_postgres TABLE_INCLUDE_LIST = ('<schema>.<table1>','<schema>.<table2>') COLUMN_EXCLUDE_LIST = ('upsolver.heartbeat.key','upsolver.heartbeat.value') INTO <catalog>.<database>.postgres_raw_data;
Once the job starts to run, SQLake will take a snapshot of each of the tables you selected to replicate. After SQLake has finished copying the historical data using a snapshot it will start streaming CDC events. You can monitoring the status of the snapshot and streaming process using the job properties window from your connection list, as shown in the screenshot below:
To verify the status of your raw staging table, you can run any of the following queries within your SQLake worksheet.
-- Query your raw data in SQLake. It may take a minute for the data to appear. SELECT * FROM <catalog>.<database>.postgres_raw_data limit 10; -- Query to see row counts by postgres table SELECT "$full_table_name", count(*) FROM <catalog>.<database>.postgres_raw_data GROUP BY "$full_table_name"; -- Query to return row counts by table and by operation SELECT "$full_table_name", "$operation", count(*) FROM <catalog>.<database>.postgres_raw_data GROUP BY "$full_table_name","$operation";
Here is an example query that returns the row count by table name and operation:
Merge changed events from staging to your target table
Your staging table contains a raw copy of all the tables you selected to replicate. The dataset includes rows from the initial snapshot and a row for each change event afterwards. In this step, you will create a table to hold the current state reflecting all changes having been merged. The syntax below includes the creation of a primary_key which will be a concatenation of all primary keys in the source table. This composite key is used to merge inserts, updates and deletes on the output table to keep it consistent with the source.
CREATE TABLE <catalog>.<database>.<table_name>( primary_key string ) PRIMARY KEY primary_key;
Next, create a job that will run every 1 minute and will insert, update, or delete rows into the output table for each CDC event that has been ingested. The primary_key column is used to match rows between the staging table and the output table. The logic is simple:
- WHEN MATCHED AND is_delete deletes the record in the output table when deleted in the source.
- WHEN MATCHED THEN REPLACE updates the target table when the primary key matches.
- WHEN NOT MATCHED THEN INSERT inserts the new record into the output table when the primary key does not match.
When you update this code for your environment, update the $full_table_name filter in the WHERE clause to reference the table that you are replicating.
CREATE SYNC JOB sync_<table_name> START_FROM = BEGINNING ADD_MISSING_COLUMNS = TRUE RUN_INTERVAL = 1 MINUTE AS MERGE INTO <catalog>.<database>.<table_name> AS pg_table USING ( SELECT *, $primary_key as primary_key, $is_delete as is_delete FROM <catalog>.<database>.postgres_raw_data WHERE $full_table_name = '<database>.<schema>.<table_name>' AND $event_time BETWEEN run_start_time() AND run_end_time() ) AS pg_raw ON (pg_table.primary_key = pg_raw.primary_key) WHEN MATCHED AND is_delete THEN DELETE WHEN MATCHED THEN REPLACE WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME EXCEPT is_delete;
Using a SELECT * command allows you to map all columns from the source table into your target table. If you wish to perform any transformations, or rename, remove or add computed columns, you can modify the SELECT clause syntax based on your requirements.
Execute the job and wait a few minutes for data to begin to populate the output table. Run the query below, substituting the database and table names for yours, to verify that data is available in the output table.
SQLake provides a simple to use CDC capability that allows you to easily replicate tables from your operational PostgreSQL database to the data lake and data warehouse. SQLake automatically creates an append-only wide table in the data lake to hold the full history of your source tables. It continuously appends changed events captured from the source. This makes it easy and economical to store raw data for ML model training and reprocessing to fix quality and corruptions in the data without going back to the source. You create jobs that merge the change events from the staging table into output tables that are kept in sync with the source. Furthermore, you model, prepare and enrich tables before loading the final results to your data warehouse or data lake for analysis and BI.
That’s it! To try this out for yourself by launching the Replicating PostgreSQL Tables to the Data Lake template in SQLake!