Avoiding Costly Full Data Scans in Your Cloud Data Warehouse

Data warehouses are designed for OLAP use cases. The structures used to store data are optimized for efficiently locating rows, rather than scanning large amounts of data. This enables you to find data quickly if your tables are optimized with the right sort and distribution keys. But if you’re not attentive to these structures when writing queries, or if you haven’t stored sufficient data, the data warehouse optimizer struggles to make the best decisions, forcing the system to scan entire data sets, in turn resulting in slow and more expensive queries.  

Data engineers and DBAs go to some length to work around this limitation and keep their databases humming.  For example, to clear data out as quickly as possible they may truncate tables to make room for new data.  But that can cause issues with error handling and prohibit data replay in the event of an ETL failure.  Overall, performance tuning in data warehouses and data lakes is incredibly complicated and prone to human error. Users often run full table scans without understanding the significant impact on system performance and cost.

In this post I’ll explore this behavior, and demonstrate how you can optimize your query environment to gain substantial performance improvements, using only declarative SQL in Upsolver SQLake.  You’ll see how you can create a declarative pipeline that self-orchestrates and leverages compression and compaction to improve query performance and reduce storage costs.

The Issue with Large Data Scans in Data Warehouses

In data warehouses (and RDBMS databases), many factors can cause full table scans, including:

  • Frequent changes to data
  • Indexing by rarely-used key
  • Data skew
  • Poor data statistics
  • Low cardinality columns
  • And others

In some cases, such as with small tables, a full table scan might be acceptable.  But in most cases, full table scans are undesirable and should be avoided. Large tables take a long time to process and can be very expensive. Sometimes a query may never even finish if your data is too large to fit in available memory. These queries may consume so many system resources that the entire data warehouse slows down and query latency across the board could fail to meet expected SLAs.

Let’s take a look at a full table scan example in a data warehouse. The use case below has a very large denormalized staging table. The target table aggregates data from the staging table and analyzes the best performing city and state. The staging table continuously grows as new orders are created, new data is added, and data is re-aggregated over the table’s entire history.  This process requires the system to perform a full data scan every time the query is executed.

First, we create a staging table:

CREATE TABLE STG_ORDERS (
	CUSTOMER_ADDRESS_ADDRESS1 VARCHAR(16777216),
	CUSTOMER_ADDRESS_ADDRESS2 VARCHAR(16777216),
	CUSTOMER_ADDRESS_CITY VARCHAR(16777216),
…
	TAXRATE FLOAT
);

Then we insert raw data into this table:

INSERT INTO "TEST"."PUBLIC"."STG_ORDERS"
SELECT
customer.address.address1::string AS CUSTOMER_ADDRESS_ADDRESS1,
customer.address.address2::string AS CUSTOMER_ADDRESS_ADDRESS2,
customer.address.city::string AS CUSTOMER_ADDRESS_CITY,
…

     taxRate::float AS TAXRATE
FROM "TEST"."PUBLIC"."MY_EXT_STAGE" raw_source;

Next, we transform the data as it moves from the staging table to the target table. To do this, we create another target table:

CREATE TABLE test.public.SNOW_GEO_PERFORMANCE (
	ORDER_CITY VARCHAR(16777216),
	ORDER_STATE VARCHAR(16777216),
	TOTAL_SALES FLOAT,
	ORDER_COUNT FLOAT,
	AVERAGE_ORDER FLOAT,
    ORDER_DATE TIMESTAMP_NTZ(9)
);

And then merge new data from the staging table to the target table, performing aggregation, count, and average operations along the way.

MERGE INTO test.public.SNOW_GEO_PERFORMANCE as target1 using
  (SELECT customer_address_city AS order_city,
       customer_address_state  AS order_state,
       SUM(netTotal)   AS total_sales,
       COUNT(*)  AS order_count,
       AVG(netTotal) AS average_order,
      DATE_TRUNC('day', TO_DATE(orderDate)) AS order_date
   FROM STG_ORDERS orders    GROUP BY customer_address_city,
         customer_address_state,
        order_date  QUALIFY ROW_NUMBER()
   OVER (PARTITION BY ORDER_CITY, ORDER_STATE ORDER BY ORDER_DATE DESC) = 1)
AS target2 on target1.ORDER_CITY=target2.ORDER_CITY AND target1.ORDER_STATE=target2.ORDER_STATE
WHEN MATCHED THEN UPDATE SET 
	target1.ORDER_CITY = target2.ORDER_CITY,
	target1.ORDER_STATE = target2.ORDER_STATE,
	target1.TOTAL_SALES = target2.TOTAL_SALES,
	target1.ORDER_COUNT = target2.ORDER_COUNT,
	target1.AVERAGE_ORDER = target2.AVERAGE_ORDER,
    target1.ORDER_DATE =  target2.ORDER_DATE
    
    WHEN NOT MATCHED THEN INSERT
    (
    ORDER_CITY,
	ORDER_STATE,
	TOTAL_SALES,
	ORDER_COUNT,
	AVERAGE_ORDER,
    ORDER_DATE)
    VALUES (
   	target2.ORDER_CITY,
	target2.ORDER_STATE, 
	target2.TOTAL_SALES,
	target2.ORDER_COUNT,
	target2.AVERAGE_ORDER,
    target2.ORDER_DATE)

The merging process from a large staging table to an aggregated target table takes a very long time because the optimizer needs to identify the changes based on their keys. This approach is slow and resource-intensive. As data volume grows, performance suffers significantly.  For example, we recently ran a benchmark on the Snowflake Data Platform in which we compared several approaches to performing analytics on near real-time data.  We used Upsolver to first prepare and then load optimized results into Snowflake in near real-time, which reduced the time it took to make data ready for analytics from 1 hour to 1 minute.

Preprocessing Helps Avoid Full Data Scans in the Data Warehouse

Now let’s look at how you can avoid full data scans using Upsolver SQLake.  SQLake’s real-time indexer incrementally aggregates and indexes. This enables it to optimize for calculated fields in the target table. It also enables the Upsolver engine to avoid large amounts of data when it performs aggregations and merges. Furthermore, this approach allows you to keep a copy of the raw data without having to truncate (that is, empty) the staging table every time new data must be written. By not truncating the staging tables, you can replay data from history if business or technical requirements change or if there are bugs to fix. 

First we create a staging table.  We do so without specifying columns because Upsolver performs schema-on-read and infers schema onto the staging table.  It also collects metadata as data is being streamed into the staging table to help you profile and model your data.

CREATE TABLE default_glue_catalog.upsolver_samples.orders_raw_data
  PARTITIONED BY $event_date;

Next we create an ingestion job that continuously reads events from the source and writes them to the output staging table we created previously.

CREATE JOB load_orders_raw_data_from_s3
  START_FROM = NOW
  CONTENT_TYPE = JSON
  AS COPY FROM S3 upsolver_s3_samples 
    BUCKET = 'upsolver-samples' 
    PREFIX = 'orders/' 
  INTO default_glue_catalog.upsolver_samples.orders_raw_data;

The SQLake ingestion job automatically parses and loads the data into the staging table. 

Next, create a target table in Snowflake and then create a job that loads from the Upsolver Samples staging table to the Snowflake target table.

CREATE TABLE test.public.SNOW_GEO_PERFORMANCE (
	ORDER_CITY VARCHAR(16777216),
	ORDER_STATE VARCHAR(16777216),
	TOTAL_SALES FLOAT,
	ORDER_COUNT FLOAT,
	AVERAGE_ORDER FLOAT,
    ORDER_DATE TIMESTAMP_NTZ(9)
);

CREATE JOB "export aggregated orders by geography to Snowflake"
  EXECUTION_INTERVAL = 1 MINUTE
  START_FROM = BEGINNING
  AS INSERT INTO test.public.SNOW_GEO_PERFORMANCE MAP_COLUMNS_BY_NAME
    SELECT customer_address_city AS order_city,
      customer_address_state  AS order_state,
      SUM(netTotal)   AS total_sales,
      COUNT(*)  AS order_count,
      AVG(netTotal) AS average_order,
      CASE  WHEN orders.orderId = row_number() over (
        partition by orders.orderid 
        order by DATE_TRUNC('day', TO_DATE(orderDate))
        desc)
      THEN SUM(netTotal) END AS new_customer_revenue,
      DATE_TRUNC('day', TO_DATE(orderDate)) AS order_date
    FROM "TEST"."PUBLIC"."STG_ORDERS" orders
    WHERE $commit_time BETWEEN run_start_time() AND run_end_time()
    GROUP BY customer_address_city,
         customer_address_state,
         order_date,
         orderid;

A minute or so after the job executes, data begins to flow into the Snowflake table and is ready to query. Queries against this table consume fewer resources, complete faster, and can help lower your data warehouse bill.  And the tables are always fresh with up-to-the-minute data.

Optimizing Data Processing in the Data Lake

Now let’s look at how data lakes approach large data sets. In data lakes, data is written in chunks that are much larger than those in databases and data warehouses. And there is no native indexing to help seek in specific areas of the data. Instead, the query engine is optimized to scan blocks of data in parallel. Users are typically charged by the amount of data scanned, which is why it’s essential to reduce the data scanned as much as possible for both query performance and cost optimization. 

Data lake queries also perform poorly when data is written in small files (small chunks compared to larger ones). This results in frequent I/O, and as mentioned previously, with a native index it becomes difficult to find the rows required to satisfy the query. The way to optimize retrieval of data and reduce the amount of data scanned is to continuously compact and partition data according to users’ query patterns. Compaction merges small files into larger and fewer files, which helps query engines process files more efficiently.  Partitioning limits the volume of data scanned by mapping physical storage locations to table columns (that is, partitions) in a metadata store; query engines can thus more quickly identify and skip over irrelevant data, accelerating queries and thereby reducing costs.

Take a simple query to aggregate results, executed in Amazon Athena, as an example:

SELECT timestamp, COUNT(DISTINCT tags_host)
FROM server_ustage_reduced
GROUP BY timestamp;

Without SQLake’s optimizations, query time is 16.15 seconds and 27.29 GB of data is scanned. After optimizations, query time drops to 5.47 seconds and total data scanned is 0.00084 GB. More on Athena performance benchmark results.

The drastic improvement in performance can easily be achieved simply by using SQLake to build your data pipelines.  No data engineering expertise is required. SQLake writes files in compressed Apache Parquet format and continuously manages compaction as data flows in. It also pre-aggregates data using its Key-Value store to further cut down latency and data scanned. SQLake automates complex data engineering tasks, so you don’t have to.

Try SQLake for Free

SQLake is Upsolver’s newest offering.  It lets you build and run reliable data pipelines on streaming and batch data via an all-SQL experience. It automates and optimizes storage, aggregations, and processing to minimize human error, and gives you the most high-performing and cost-effective querying environment you can work with.

Try it for free. No credit card required.

Published in: Blog , Building Data Pipelines
Mei Long
Mei Long

Mei Long is a Product Manager at Upsolver. She is on a mission to make data accessible, usable, and manageable in the cloud. Previously, Mei played an instrumental role working with the teams that contributed to the Apache Hadoop, Spark, Zeppelin, Kafka, and Kubernetes projects.

Keep up with the latest cloud best practices and industry trends

Get weekly insights from the technical experts at Upsolver.

Subscribe

Templates

All Templates

Explore our expert-made templates & start with the right one for you.