Explore our expert-made templates & start with the right one for you.
Upsolver SQLake makes it simple to ingest into a data lake and enrich the data using reference data events that arrive in Amazon Kinesis Data Streams (KDS). In this guide, we will be ingesting streaming sales records and then joining them with a table containing data on the most recent store employee who interacted with the order.
There are 5 simple steps:
Let’s walk through the steps
Connections in SQLake contain required authentication details for accessing external systems such as Kinesis Data Streams or S3 buckets. SQLake enables you to authenticate via either AWS IAM roles or AWS IAM access and secret keys. Examples for both authentication methods are shown below.
Note that if you have deployed SQLake into your VPC, you can leave the authentication parameters undefined. The connection defaults to the IAM role you defined for SQLake during the integration process.
Examples for creating a connection to Kinesis:
Authentication via AWS Role
CREATE KINESIS CONNECTION kinesis_stream AWS_ROLE = '<role arn>' EXTERNAL_ID = '<external id>' REGION = '<aws region>';
Authentication via AWS Access Keys
CREATE KINESIS CONNECTION kinesis_stream AWS_ACCESS_KEY_ID = '<access key>' AWS_SECRET_ACCESS_KEY = '<secret key>' REGION = '<aws region>';
There are additional parameters you may choose to set. Of particular use are:
READ_ONLY = TRUE
SQLake will only read data from Kinesis.
STREAM_DISPLAY_FILTERS = (‘<stream_name1>,<stream_name2>,…’)
SQLake will only show the referenced streams in the connection catalog.
Example of creating a connection to Amazon S3
Authentication via AWS Roles:
CREATE S3 CONNECTION upsolver_s3_samples AWS_ROLE = 'arn:aws:iam::949275490180:role/upsolver_samples_role' EXTERNAL_ID = 'SAMPLES' READ_ONLY = TRUE;
For full documentation on the CREATE S3 CONNECTION command, consult the documentation.
Staging tables are used to store a raw and immutable copy of the data that is copied from source systems. Staging tables do not need schema to be defined, as SQLake automatically infers the schema and creates it in the AWS Glue Data Catalog. For most staging tables, a best practice would be to partition them by $event_date. This is an automatically-generated system column that is the date of the incoming event.
Create the staging tables using the following commands:
CREATE TABLE default_glue_catalog.database_2777eb.orders_raw_data PARTITIONED BY $event_date; CREATE TABLE default_glue_catalog.database_2777eb.sales_info_raw_data PARTITIONED BY $event_date;
Tables are configured by default to use the default storage location of the GLUE CATALOG that the table is created in. If you wish to store ingested data in a different location, you can customize the STORAGE_CONNECTION and STORAGE_LOCATION as shown below:
CREATE TABLE default_glue_catalog.database_2777eb.orders_raw_data PARTITIONED BY $event_date STORAGE_CONNECTION = <connection name> STORAGE_LOCATION = <s3://bucket/path>;
The full syntax of the CREATE TABLE command can be found in the documentation.
Next, we need to ingest data from the sources (Kinesis and S3 buckets). To do that we create a COPY FROM job that reads events from the source and copies them into data lake staging tables. The data is copied continuously, minute by minute. SQLake automatically manages all table management (partitioning, compaction, cataloging, and so on).
Create a COPY FROM job to ingest from Kinesis and S3 as shown below.
Ingesting from a Kinesis stream
CREATE SYNC JOB ingest_orders_from_kinesis START_FROM = BEGINNING CONTENT_TYPE = JSON AS COPY FROM KINESIS kinesis_stream STREAM = 'webstore_orders' INTO default_glue_catalog.database_2777eb.orders_raw_data;
Ingesting from an S3 bucket
CREATE SYNC JOB ingest_sales_info_from_s3 START_FROM = BEGINNING CONTENT_TYPE = JSON AS COPY FROM S3 upsolver_s3_samples BUCKET = 'upsolver-samples' PREFIX = 'sales_info/' INTO default_glue_catalog.database_2777eb.sales_info_raw_data;
The following are important parameters to consider when building your COPY FROM job:
START_FROM = BEGINNING | NOW | TIMESTAMP ‘<timestamp>’
Defines how far back to begin reading events from the stream.
BEGINNING reads from the earliest event that is stored on the stream.
NOW starts reading from the time that the job was created.
TIMESTAMP ‘<timestamp>’ reads from the timestamp referenced.
CONTENT_TYPE = JSON | CSV | PARQUET | AVRO | …
Specifies the type of content to read from the stream. Certain content types may require additional settings; find these settings in the following documentation.
Now calculate the most recent sales information from the ingested data. You can do this by creating a Materialized View and selecting the most recent record per order.
Create the Materialized View
CREATE SYNC MATERIALIZED VIEW default_glue_catalog.database_2777eb.store_orders_mv AS SELECT orderid, LAST(salesinfo.source) AS Source, LAST(salesinfo.store.location.country) AS country, LAST(salesinfo.store.location.name) AS name, LAST(salesinfo.store.servicedby.employeeid) AS employeeid, LAST(salesinfo.store.servicedby.firstname) AS firstname, LAST(salesinfo.store.servicedby.lastname) AS lastname FROM default_glue_catalog.database_2777eb.sales_info_raw_data GROUP BY orderid;
Grouping the data by orderid, acting as a primary key, results in each row representing a specific order that contains the most recent sales data. You can now join this materialized view with another table on the orderid column. For documentation on the full syntax, see Create Materialized View
Lastly, create the target table to hold the transformed and enriched data. This is similar to Step 2. In this example we do not declare all the column and data types – only the ones we need for primary key and partition key. This causes SQLake to automatically detect, infer, and update the schema. If you do not wish for schema to be automatically detected, you can define a static list of column names and data types.
This is how you create the target table with the order_id as primary key and partition_date as the partitioning column.
CREATE TABLE default_glue_catalog.database_2777eb.orders_with_employee( order_id string, partition_date date ) PRIMARY KEY order_id PARTITIONED BY partition_date;
Note that in the previous statement we defined order_id as the primary key that enables SQLake to insert and update records based automatically.
Now create a job to join data from the staging table and MV, writing the results to the target table in the data lake.
CREATE SYNC JOB join_two_tables_orders_with_last_employee START_FROM = BEGINNING ADD_MISSING_COLUMNS = TRUE RUN_INTERVAL = 1 MINUTE AS INSERT INTO default_glue_catalog.database_2777eb.orders_with_employee MAP_COLUMNS_BY_NAME SELECT S.orderid AS order_id, S.nettotal, MV.employeeid AS employeeid, MV.name AS store_name, employee_name, $event_date AS partition_date FROM default_glue_catalog.database_2777eb.orders_raw_data AS S LEFT JOIN default_glue_catalog.database_2777eb.store_orders_mv AS MV ON MV.orderid = S.orderid LET employee_name = MV.firstname || '_' || MV.lastname WHERE MV.source = 'Store' AND $event_time BETWEEN run_start_time() AND run_end_time();
There are several things to note here:
For the full syntax on SQL Transformation Insert job, see the documentation.
SQLake makes it easy to connect to streaming sources and enrich events in near real-time. Using Materialized Views you create a single source of truth for key datasets that you can then easily join against from other jobs. This ensures consistency and improves reliability of your data pipelines.
Get started today for free with sample data or bring your own.
Accelerate data lake queries
Real-time ETL for cloud data warehouse
Build real-time data products
Explore our expert-made templates & start with the right one for you.