Upserting and deleting rows in Snowflake and Redshift

Upsolver SQLake makes it easy to ingest data from a variety of sources such as Amazon S3, Apache Kafka, and Amazon Kinesis Data Streams into an Amazon S3 based data lake and deliver prepared data to a data warehouse such as Snowflake and Amazon Redshift. In the simplest cases, data can be appended to a target table, whether or not the row referenced already exists (i.e. no updates or deletes). This is a great option when a business needs to maintain the entire history of the data, instead of storing only the latest values. To learn more about append-only use cases, please refer to Transforming and Loading Data into a Data Lake, Redshift, or Snowflake.

Of course, you may be interested in only the most recent value for a given row, which is usually the case for ad hoc analytics and BI reporting. For example, the sales department may want to maintain the daily revenue derived from an order data stream produced by an e-commerce application. This means as order events arrive for the day, the aggregated revenue must be updated in the target table.

Furthermore, as customer information such as the address changes, the customer table must also be updated. This is simple to do with transactional databases, but is quite difficult to achieve in a data lake or a data warehouse. SQLake makes it easy to build data pipelines that automatically insert, update and delete records (whether they include frequently or slowly changing dimensions) without the user needing to learn complex syntax or build external processes to maintain the data.

In this tutorial, you will learn how to use SQLake to extract raw data from an S3 bucket, transform and model it, and then upsert records (update and/or delete) in a target table in the data warehouse.

Note that MySQL and PostgreSQL databases are common sources for data used in analytics. In SQLake, instead of using an S3 bucket as the source, you can configure a change data capture (CDC) connection that will stream changes from a source database table to a target table in your data lake or data warehouse. In either case (S3 bucket or database CDC) SQLake will insert, update or delete records per the changes in the source table so that the target table reflects the current state of the source database.

Run it!

  1. Create connections to your S3 source and Snowflake target
  2. Ingest raw data into a staging table in the data lake
  3. Prepare your Snowflake tables
  4. Model, merge and load results into Snowflake
    1. As UPSERT
    2. As UPSERT with DELETE
    3. As UPSERT for nested array data

Create connections to your S3 source and Snowflake target

Start by creating a connection to the source dataset in Amazon S3. You’ll be using sample data provided by Upsolver to get started quickly. This example uses S3 as the source, but you can connect to other sources as well.

CREATE S3 CONNECTION s3_conn
    AWS_ROLE = 'arn:aws:iam::949275490180:role/upsolver_samples_role'
    EXTERNAL_ID = 'SAMPLES'
    READ_ONLY = TRUE;

Next, you need to create a connection to your Snowflake data warehouse. The output of your pipeline will write results using this connection to the data warehouse.

CREATE SNOWFLAKE CONNECTION sf_conn
CONNECTION_STRING='jdbc:snowflake://<account>.<region>.aws.snowflakecomputing.com/?db=<dbname>'
USER_NAME = '<username>'
PASSWORD = '<password>';

Similarly, you can create a connection to your Amazon Redshift cluster, as follows:

CREATE REDSHIFT CONNECTION rs_conn
CONNECTION_STRING='jdbc:redshift://<hostname>.<region>.redshift.amazonaws.com:5439/<dbname>'
USER_NAME = '<username>'
PASSWORD = '<password>';

Once the connections have been created, the remaining steps are the same for Snowflake and Redshift.

Create a job to ingest raw data into a staging table

In this step, you will create a staging table in the data lake and an ingestion job that will copy data from the source, optimize it, and store it in the staging table.

The first step is to create the staging table.

CREATE TABLE default_glue_catalog.database_2777eb.orders_raw_staging
    PARTITIONED BY $event_date;

The second step is to create the ingestion job. This job will read files from S3 under upsolver-samples/orders/ and process all events from the BEGINNING, i.e. the first ever event stored in that directory.

CREATE SYNC JOB load_orders_raw
    START_FROM = BEGINNING
    DATE_PATTERN = 'yyyy/MM/dd/HH/mm'
    CONTENT_TYPE = JSON
    AS COPY FROM S3 s3_conn 
      BUCKET = 'upsolver-samples' 
      PREFIX = 'orders/'
    INTO default_glue_catalog.database_2777eb.orders_raw_staging;

Once the job has been created, give it a minute or two to process and load the data. You could then be able to query it directly from SQLake or your favorite data lake query engine like Amazon Athena.

SELECT * 
FROM default_glue_catalog.database_2777eb.orders_raw_staging 
LIMIT 10;

Here is an example of the results when querying it from SQLake

Prepare your Snowflake tables

The raw data above is in JSON format and contains customer information, order information, and order item information. Next, we want to load customer information into the Snowflake Customer table, order information in the Snowflake Orders_Data table and the order items into Order_Items table. The following three tables should be created in Snowflake prior to proceeding with the next step.

Sample SQL code to create the given tables in Snowflake.

CREATE TABLE DEMO.CUSTOMERS (
        CUSTOMER_ID     VARCHAR,
        FIRST_NAME      VARCHAR,
        LAST_NAME       VARCHAR,
        ADDRESS_LINE1   VARCHAR,
        ADDRESS_LINE2   VARCHAR,
        CITY            VARCHAR,
        STATE           VARCHAR,
        ZIPCODE         VARCHAR
    );
CREATE TABLE DEMO.ORDERS_DATA (
        ORDER_ID VARCHAR,
        ORDER_DATE DATE,
        ORDER_TYPE VARCHAR,
        NET_TOTAL FLOAT,
        TAX_RATE FLOAT,
        CUSTOMER_ID VARCHAR
);
CREATE TABLE DEMO.ORDER_ITEMS (
        ITEM_ID INT,
        ORDER_ID VARCHAR,
        ITEM_NAME VARCHAR, 
        ITEM_CATEGORY VARCHAR,
        ITEM_PRICE FLOAT,
        ITEM_QTY  INT
);

Once the tables are created in Snowflake, navigate back to the SQLake console and proceed with the next steps.

Model, merge, and load results into Snowflake

With SQLake you can model your data in the data lake and then load the results into target tables residing in Snowflake, Redshift or a data lake. Once loaded, the target tables are ready for users to query or further transform.

Note that while Snowflake and Redshift allow you to add PRIMARY KEY constraints on a table, those constraints are not actually enforced by SQLake. They are (essentially) just metadata and/or documentation that tools (and users) can use to understand the *intended* uniqueness of the data in the table. From SQLake, transformation (INSERT INTO) jobs will only append records in the Snowflake or Redshift tables. 

For example, Order events include customer information within each of the order objects. This customer data will be inserted into the customer table using the INSERT INTO clause. This allows SQLake to maintain an append-only table that contains the entire history of changes. You can learn more about this use case in Transforming and Loading Data into a Data Lake, Redshift, or Snowflake guide. 

To maintain only the most recent information in the target tables, you create a transformation ( MERGE INTO) job. Instead of appending all new events, the MERGE command updates and deletes rows based on a primary key.

To demonstrate how to use the MERGE command, let’s create jobs to transform and model the customer, order and order items datasets. You will flatten the nested customer data structure, unnest the order items array and rename some of the columns to fit your target data model. While modeling, you would want to include reference keys such as customer id into the order table and order id into the order items table so that analysts can easily join these tables. You will create these jobs as transformation jobs using the MERGE INTO statement.

  1. The first example uses the MERGE INTO command to upsert rows. If there is a matching customer record already present in the target table, it will be updated. Otherwise it will be inserted.
CREATE SYNC JOB load_customers_merge
       RUN_INTERVAL = 1 MINUTE
       START_FROM = BEGINNING
    AS MERGE INTO SNOWFLAKE "snow_conn"."DEMO"."CUSTOMERS_MERGE" AS target
    USING (    
        SELECT
          customer.email            AS CUSTOMER_ID,      
          customer.firstname        AS FIRST_NAME,
          customer.lastname         AS LAST_NAME,
          customer.address.address1 AS ADDRESS_LINE1,
          customer.address.address2 AS ADDRESS_LINE2,
          customer.address.city     AS CITY,
          customer.address.state    AS STATE,
          customer.address.postcode AS ZIPCODE
        FROM default_glue_catalog.database_2777eb.orders_raw_staging
        WHERE $event_time BETWEEN run_start_time() AND run_end_time()
    ) source
    ON target.CUSTOMER_ID = source.CUSTOMER_ID
    WHEN MATCHED THEN REPLACE
    WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

Running the job above loads the target Snowflake table, CUSTOMERS_MERGE, with historical data. The following is a sample of the output.

Double check that there are no duplicates in the table. Looks good, so let’s continue.

To show an example to slowly changing dimensions, we had modified ZIPCODE in the source dataset by updating the values for some of the customers. The MERGE job recognizes the changes and automatically updates the target table in Snowflake. Query the table again and you will see those changes.

  1. Next let’s use the MERGE command together with the DELETE clause. If there is a matching order record already present in the target table, it will be updated or deleted. Otherwise it will be inserted. We determine when to delete a row by evaluating a specific condition and storing the Boolean result in the to_delete field. This field may already exist in the data if the events are CDC changes or processed upstream. The target table, as you can see below, includes NET_TOTAL values of 0 and let’s assume it has an incorrect TAX_RATE of .12 which should be 12. We will update this table to demonstrate how the MERGE job updates and deletes rows based on the conditions in the SQL.
CREATE SYNC JOB load_orders_data_merge
       RUN_INTERVAL = 1 MINUTE
       START_FROM = BEGINNING    
    AS MERGE INTO SNOWFLAKE "snow_conn"."DEMO"."ORDERS_DATA" AS target
    USING (
       SELECT orderid                        AS ORDER_ID,
       extract_timestamp(orderdate::STRING)  AS ORDER_DATE,
       ordertype                             AS ORDER_TYPE,
       nettotal                              AS NET_TOTAL,
       taxrate_percent                       AS TAX_RATE,
       customer.email                        AS CUSTOMER_ID,
       to_delete                             AS to_delete                                
    FROM default_glue_catalog.database_2777eb.orders_raw_staging
    LET to_delete = IF_ELSE(nettotal &lt;= 0, true, false),
        taxrate_percent = taxrate * 100
    WHERE $event_time BETWEEN run_start_time() AND run_end_time()    
    ) source
    ON target.ORDER_ID = source.ORDER_ID
    WHEN MATCHED AND to_delete THEN DELETE
    WHEN MATCHED THEN REPLACE
    WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME EXCEPT to_delete;

This is the table before the MERGE job.

This is the output after the MERGE job applied the changes. All orders have a positive NET_TOTAL and the tax rate is corrected.

  1. The third example uses MERGE with nested array data that we must flatten before we upsert – This uses the UNNEST operation to flatten the array, and then it upserts the results using the MERGE INTO command. Similar to the first example, if there is a matching order item record already present in the target table, it will be updated. Otherwise it will be inserted.
CREATE SYNC JOB load_items_unnest_merge
       RUN_INTERVAL = 1 MINUTE
       START_FROM = BEGINNING
    AS MERGE INTO SNOWFLAKE "snow_conn"."DEMO"."ORDER_ITEMS" AS target
    USING(      
      UNNEST(
      SELECT data.items[].itemid        AS ITEM_ID,
        orderid                         AS ORDER_ID,  
        data.items[].name               AS ITEM_NAME,
        data.items[].category           AS ITEM_CATEGORY,
        data.items[].unitprice          AS ITEM_PRICE,
        data.items[].quantity           AS ITEM_QTY
      FROM default_glue_catalog.database_2777eb.orders_raw_staging  
      WHERE $event_time BETWEEN run_start_time() AND run_end_time()
      )
    ) source
    ON target.ITEM_ID = source.ITEM_ID
    WHEN MATCHED THEN REPLACE
    WHEN NOT MATCHED THEN INSERT MAP_COLUMNS_BY_NAME;

The original dataset looks like this.

After unnesting the item order array we can see the array fields as top level columns.

Summary

SQLake allows you to create a pipeline to copy data from source streams, databases and object stores into data lake tables for staging. Using SQL, you model and transform the data, storing the results in the data lake. To perform BI queries and ad-hoc analysis you load the transformed results into your Snowflake or Redshift data warehouses. SQLake enables you to write results as append-only or as fully merged tables. This allows you to keep transformation and modeling logic in a central place and reduce the cost of continuously transforming and deduplicating data in your data warehouse. SQLake automatically scales compute resources up and down to match the velocity and volume of your data so you don’t need to.

Get started today for free with sample data or bring your own.

Published in: Blog , Building Data Pipelines
Ajay Chhawacharia
Ajay Chhawacharia

Ajay Chhawacharia possesses 20+ years of IT experience in managing and delivering full stack customer solutions, including providing technical leadership with a passion for data architecture and data engineering. As a Senior Solutions Architect at Upsolver, Ajay helps customers solve complex problems with the most efficient and cost effective solutions.

Keep up with the latest cloud best practices and industry trends

Get weekly insights from the technical experts at Upsolver.

Subscribe

Templates

All Templates

Explore our expert-made templates & start with the right one for you.