Introducing Automated Ingestion for dbt Core

dbt Core revolutionized how data teams build, collaborate and deploy models and transformations to ship better data products faster. 

Today, we’re excited to announce the ability for data and analytics engineers to build and deploy extract and load tasks in dbt Core using Upsolver.  Upsolver is a simple to use data ingestion solution that continuously loads fresh and high-quality data from object stores, operational databases and streams into your warehouse and lake.

Modernizing transformation and modeling

The emergence of the Modern Data Stack reinforces the need to simplify the processes and tools required to extract, load and transform data. This enables users to collaborate responsibly and deliver high-quality data products faster.

Traditional tools for loading and transforming data in the warehouse were mostly proprietary and focused on ease of use with visual authoring, which discouraged software engineering best practices and created vendor lock-in.

dbt Core, changed this by allowing data and analytics engineers to easily implement best practices like modularity, version control, portability, continuous integration/continuous deployment (CI/CD), testing and automation.

Modernizing extract and load

To accelerate data movement, the Modern Data Stack also describes a simple to use extract and load approach. This approach focuses on the ease of extracting data from a wide range of sources and loading it into the data warehouse, where dbt can model and transform the data into business-ready tables. 

However, today’s popular tools used to extract and load data are heavily UI based. They don’t follow the design principles and software engineering best practices that modernized the transformation phase of the data life cycle. 

To truly modernize the data stack, we need to extend the benefits of code-first to the extract and load phases as well, without sacrificing ease-of-use. The integration between dbt Core and Upsolver delivers the first-ever, modern data integration solution that makes the extract and load process simple, modular, portable and easily integrated with your developer tools. You can learn more about the dbt-upsolver adapter on our github repository.

Getting Started

Get started by signing up for Upsolver Cloud which is completely free for development and evaluation. Once signed up, follow these 3 simple steps:

  1. Create an API token in Upsolver
  2. From your command prompt: pip install dbt-core, if not already installed
  3. From your command prompt: pip install dbt-upsolver

Here is a short video walkthrough of getting started.

Deploying your first extract and load task

The dbt-upsolver adapter exposes two types of model materializations, connection and incremental.

Connection allows you to develop config-only models that configure the connection properties required to read from source systems like Apache Kafka or PostgreSQL and write to target systems like Snowflake and Amazon Redshift.

Incremental is how Upsolver materializes data in your target table. Because Upsolver is an always-on, continuous data ingestion solution, it uses either insert or merge strategies to update data in the target table. This eliminates the need to schedule periodic full table reloads and automatically keeps your data fresh with no additional effort.

Building connections

Start by creating two models, one for the source system connection, Kafka, and another for the target system connection, Snowflake.

Kafka source connection model – create a file named upsolver_kafka_samples.sql in your models/ folder with the following code, update to reflect your own connection details:

{{ config (
       materialized='connection',
        connection_type='KAFKA',
       connection_options={
            'hosts': ('xxxx.us-east-1.aws.confluent.cloud:9092'),
            'consumer_properties' : '
               bootstrap.servers=xxxx.us-east-1.aws.confluent.cloud:9092
               security.protocol=SASL_SSL
               sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
               required username="xxxx"
               password="xxxxx";
               ssl.endpoint.identification.algorithm=https
               sasl.mechanism=PLAIN'
       }
   )
}}

Snowflake target connection model – create a file named snowflake_target.conn.sql in your models/ folder with the following code, update to reflect your own connection details:

{{ config (
     materialized='connection',
     connection_type='SNOWFLAKE',
     connection_options={
       'connection_string':'jdbc:snowflake://snowflakecomputing.com/?db=DEMO_DB&warehouse=WH',
       'user_name':'xxx',
       'password':xxx'
     }
   )
}}

Execute dbt run in your command prompt to build and deploy the models to Upsolver. 

Building the extract and load job

The second and final step is to create the incremental model that will generate the extract and load job in Upsolver. Create a file named extract_n_load_to_snowflake.sql in your models/ folder and paste the following code:

{{ config (
            materialized='incremental',
            sync = True,
            source = 'KAFKA',
            target_type = 'SNOWFLAKE',
            target_connection = 'snowflake_target_conn',
            target_schema = 'ROYON',
            target_table_alias = 'USER_INFO_TBL'
            options = {
               'TOPIC': 'user_info',
               'EXCLUDE_COLUMNS': ['password'],
               'COLUMN_TRANSFORMATIONS': {'credit_card': 'MD5(credit_card)'},
               'CREATE_TABLE_IF_MISSING': True,
               'COMPUTE_CLUSTER': 'SQLake',
               'START_FROM': 'NOW',
               'WRITE_INTERVAL': '1 MINUTES'
            },
            primary_key=[{'field':'user_id', 'type':'string'}]
        )
}}

SELECT * FROM {{ ref('upsolver_kafka_samples') }}

A few things to point out about the example model code:

  1. By default dbt-upsolver will use the merge incremental strategy.
  2. target_schema and target_table_alias let you configure where to output the table.
  3. Within the options block, you can configure Upsolver specific properties like which columns to exclude, pre-commit transformations like hashing and whether to create a new table or insert into an existing table.

Next, execute the dbt run command to build the job. Upsolver will automatically perform the following steps:

  1. Connect to Kafka and listen to events on topic user_info.
  2. Detect the source schema and validate field names and types.
  3. Create table DEMO_DB.ROYON.USER_INFO_TBL in Snowflake with the detected schema.
  4. Merge new or updated rows into the target table every 1 minute.

That’s all there is to it. In under 5 minutes, you built an extract and load task using Upsolver and dbt Core.

dbt Core and Upsolver provide the first-ever modern data integration that follows the core principles of the Modern Data Stack and offers you flexibility, modularity, portability and ease of use. Learn more about Upsolver and its capabilities by visiting upsolver.com. Get started with Upsolver today by signing up for free.

Published in: Blog , Use Cases
Roy Hasson
Roy Hasson

Roy Hasson is the head of product @ Upsolver. Previously, Roy was a product manager for AWS Glue and AWS Lake Formation.

Keep up with the latest cloud best practices and industry trends

Get weekly insights from the technical experts at Upsolver.

Subscribe

Templates

All Templates

Explore our expert-made templates & start with the right one for you.