Explore our expert-made templates & start with the right one for you.
How to build streaming data pipelines with Redpanda and Upsolver SQLake
This post is co-published with Redpanda.
In a nutshell: batch data processing is slow and expensive. It requires loading data into the data warehouse before preparing and transforming data for analysis. Effective, timely and impactful business decisions require fresh and accurate data that can be efficiently achieved using stream processing.
In this post, you’ll learn how to build a streaming data pipeline in SQL using Upsolver SQLake and Redpanda.
How stream processing is different from batch
Stream processing is very similar to batch processing. You can think of each event or a group of events in a stream as a batch. Real-time event processing handles each event as it arrives. Micro-batch event processing handles a group of events that arrive within a small window of time, like every minute, or when the accumulated events reach a certain batch size, like 4KB. Batch processing is just an extension of micro-batch processing (or vice versa). See, not that much different.
When you process a stream of data you gain several advantages:
- You have access to fresh data sooner and make more informed decisions
- You reduce storage by filtering, aggregating and optimizing data on-the-fly
- You detect data quality issues quicker, instead of waiting for a large batch to complete
A simple streaming architecture
To gain these benefits you need to consider moving mission critical jobs from batch processing to stream processing. This is not as difficult as you may think. The following is a diagram that shows a simple streaming data architecture.
The majority of your data sources are already producing data in a streaming manner, as discrete events. Writing these events into a message bus is a best practice. It provides resiliency and high availability for your data streams. Next, the stream processor allows you to build ETL jobs that prepare, transform and join datasets on the fly. Data is then written into a serving layer for analytics and machine learning, as well as to the data lake. The data lake enables you to integrate this data into more systems and tools, like offline feature extraction or model training for machine learning / artificial intelligence use. The data lake also provides inexpensive long-term storage for event history, enabling time travel to a previous state, plus replay and reprocessing of historical data to enrich or correct it.
Estuary wrote a post on The Real-Time Data Landscape that provides a list of tools and vendors that plug into different parts of this architecture. Review it to get more context and understand your implementation options.
Implementing a streaming solution with Upsolver SQLake and Redpanda
Two of the most critical components of a streaming architecture is the streaming bus and the stream processing engine.
Redpanda is an Apache Kafka®-compatible, high-performance data streaming platform or bus. Written in C++, it is JVM-free and Apache ZooKeeper®-free, making it simple to deploy and manage. It allows you to extract the best performance out of every core, disk, memory chip and network byte, without sacrificing the reliability and durability of your data.
Upsolver SQLake is a unified batch and stream processing engine. It allows you to easily consume events from popular sources like Redpanda, Amazon Kinesis, Amazon S3 and databases like MySQL and PostgreSQL (change data capture). You can transform events – preparing, enriching, joining and aggregating data – on the fly. SQLake retains a raw copy of the events in the data lake to make it easy to recover from failures or replay streams at any time. Finally, you output the results of your transformations to your favorite query engine like Amazon Redshift, Snowflake or Amazon Athena.
Let’s redraw the previous architecture diagram to reflect how Redpanda and SQLake fit into it.
Bulk loads and streaming data in a single pipeline
In both ELT and ETL design patterns, users are tasked with manually developing, testing and maintaining the orchestration logic needed to support a reliable, consistent and performant data pipeline. To minimize production failures and improve the reliability of your pipelines, it is essential to automate as much as possible and eliminate potential failure points. Manual work not only slows down business growth but also introduces numerous human errors that compound over time. These human errors will result in poor data quality, slow performance, high costs, which contribute to poor business outcomes.
The example we will walk through next demonstrates how to determine who is the last salesperson to fulfill a particular order in our store. The pipeline performs two simple tasks:
- Loads order history from an Amazon S3 based data lake
- Joins the order history with a real-time stream of salesperson activity, i.e. orders they are actively fulfilling.
Let’s get started.
1. Configuring Redpanda
There are many flavors of Redpanda: install locally, run on top of Kubernetes, run as a managed cloud service. Visit Redpanda’s website to sign up for a free trial.
Once you create a new cluster, you will be able to see all the information you need to start streaming!
You will use the Bootstrap server for the HOSTS property when configuring SQLake in the following step.
Now, create a new user called demousr by clicking on the Security icon in the left menu.
And don’t forget to edit ACLs as shown below to enable demousr to access topics from SQLake.
2. Ingest streaming data from Redpanda with SQLake
First, sign up to SQLake, it’s completely free to try. Once logged in, launch the Streaming pipeline with Redpanda template.
The template will include the following sections of code. Each should be self-explanatory but we’ll walk through each step. Clicking the play icon will run the particular step. As SQLake is a stream processor, clicking a CREATE JOB or CREATE MATERIALIZED VIEW step will kick off a continuously running job.
2a. Create a connection to read data from Redpanda. Since Redpanda is Kafka compatible, we’ll use the KAFKA connection that already exists in SQLake.
CREATE KAFKA CONNECTION redpanda_conn HOSTS = ('your-host-name.fmc.ppd.cloud.redpanda.com:9092') CONSUMER_PROPERTIES = ' bootstrap.servers=your-host-name.fmc.ppd.cloud.redpanda.com:9092 security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demousr" password="********"; ssl.endpoint.identification.algorithm=https sasl.mechanism=SCRAM-SHA-512';
Before executing the above statement, make sure to update the host name and username properties with your own information.
2b. Create a staging table to hold the raw stream. This table is located in the data lake and is maintained and optimized by SQLake. You can query it at any time with a data lake query engine like Amazon Athena.
CREATE TABLE default_glue_catalog.<db_name>.salesinfo_redpanda PARTITION BY $event_date;
2c. Create a job to stream data into the table from the Redpanda topic.
CREATE SYNC JOB load_salesinfo_redpanda START_FROM = BEGINNING CONTENT_TYPE = JSON AS COPY FROM KAFKA redpanda_conn TOPIC = 'salesinfo' INTO default_glue_catalog.<db_name>.salesinfo_redpanda;
Query the staging table to make sure data is available before continuing to the next step.
SELECT * FROM default_glue_catalog.<db_name>.salesinfo_redpanda LIMIT 10;
A sample output should look like this:
2d. Create a materialized view to continuously update the last salesperson’s information for each order. This view joins a particular order with the most recent salesperson who took action against that order, using the orderid as the join key.
CREATE SYNC MATERIALIZED VIEW default_glue_catalog.<db_name>.orders_mv AS SELECT orderid, LAST(saleinfo.source) as source, LAST(saleinfo.store.location.country) as country, LAST(saleinfo.store.location.name) as name, LAST(saleinfo.store.servicedby.employeeid) as employeeid, LAST(saleinfo.store.servicedby.firstname) as firstname, LAST(saleinfo.store.servicedby.lastname) as lastname FROM default_glue_catalog.<db_name>.salesinfo_redpanda GROUP BY orderid;
At this stage, the streaming data is continuously ingested. All the raw events are persisted to the data lake in S3 and made queryable. A materialized view is being updated based on salesperson action captured in real-time. The next phase of the pipeline will load the historical orders data which we’ll join with the salesperson actions.
3. Loading historical order data from Amazon S3
3a. Create a connection to SQLake’s sample S3 bucket where the order data is stored.
CREATE S3 CONNECTION upsolver_s3_samples AWS_ROLE = 'arn:aws:iam::949275490180:role/upsolver_samples_role' EXTERNAL_ID = 'SAMPLES' READ_ONLY = TRUE;
3b. Create a staging table in the data lake to hold the raw order data. This is similar to the staging table we created for the streaming source.
CREATE TABLE default_glue_catalog.<db_name>.orders_raw_data() PARTITIONED BY $event_date;
3c. Create an ingestion job to load raw orders into the staging table.
CREATE SYNC JOB load_orders_raw_data_from_s3 CONTENT_TYPE = JSON AS COPY FROM S3 upsolver_s3_samples BUCKET = 'upsolver-samples' PREFIX = 'orders/' INTO default_glue_catalog.<db_name>.orders_raw_data;
Query your raw data in SQLake. It may take a minute for the data to appear.
SELECT * FROM default_glue_catalog.<db_name>.orders_raw_data LIMIT 10;
A sample output should look like this:
4. Joining historical and streaming tables
This SQL statement may look overwhelming but it’s actually very simple. It creates a job that reads the raw order data from the staging table and joins it with the materialized view we created in Step 2d. You can customize the SELECT block to include specific columns you’re interested in exposing or perform business logic transformations that suit your needs. This example only exposes the salesperson information and the ID of the order they are currently fulfilling.
4a. Create a table to store the final datasets
CREATE TABLE default_glue_catalog.<db_name>.salesperson_by_order(partition_date date) PARTITIONED BY partition_date;
4b. Create a job to transform, join and load results to the target table.
CREATE SYNC JOB join_two_tables_orders_with_last_employee START_FROM = BEGINNING ADD_MISSING_COLUMNS = TRUE RUN_INTERVAL = 1 MINUTE AS INSERT INTO default_glue_catalog.<db_name>.salesperson_by_order MAP_COLUMNS_BY_NAME SELECT s.orderid, mv.employeeid AS employeeid, mv.firstname AS firstname, mv.lastname AS lastname, $event_date AS partition_date FROM default_glue_catalog.<db_name>.orders_raw_data as s LEFT JOIN default_glue_catalog.<db_name>.orders_mv AS mv ON mv.orderid = s.orderid WHERE mv.source = 'Store' AND $event_time BETWEEN run_start_time() AND run_end_time();
Query the output table to view the results of the transformation job. Since this target table was created in the data lake you can use any query engine that supports reading from the data lake. Additionally, you can store your output in Snowflake or Amazon Redshift.
SELECT * FROM default_glue_catalog.<db_name>.salesperson_by_order LIMIT 10;
Through a simple example, we demonstrated that combining historical data with streaming events is a powerful tool to deliver fresh insights. Using only SQL, you built a data pipeline that consumed real-time events from Redpanda and stored them in the data lake. You then took raw JSON objects in S3 and staged them in the data lake. Both of these staging tables in the data lake are queryable and automatically optimized for you to improve performance and reduce query costs.
From there, you joined historical and streaming data to produce the final output. All of this took only a few minutes and a few dozen lines of SQL to implement. You didn’t need to configure a schedule for the jobs or orchestrate them to run. All of that was automatically created for you. Eliminating manual, error-prone tasks enables more users to be self-sufficient and deliver value with data quickly.
If you’d like to learn more about Upsolver SQLake you can:
- Test drive Upsolver SQLake for free today.
- Explore the builder’s hub to get started with tutorials, videos and SQL templates.
- Join our Slack Community and chat with fellow developers or Upsolver solution architects, product managers and engineers.
To learn more about Redpanda you can:
- Take Redpanda for a test drive here.
- Check out our documentation to understand the nuts and bolts of how the platform works, or read more of our blogs to see the plethora of ways to integrate with Redpanda.
- To ask our Solution Architects and Core Engineers questions and interact with other Redpanda users, join the Redpanda Community on Slack.