Enriching Amazon Kinesis Data Streams with Reference Data

Ingest and Join Events from Kinesis Data Streams

Upsolver SQLake makes it simple to ingest into a data lake and enrich the data using reference data events that arrive in Amazon Kinesis Data Streams (KDS). In this guide, we will be ingesting streaming sales records and then joining them with a table containing data on the most recent store employee who interacted with the order.

There are 5 simple steps:

  1. Create a connection to your Kinesis Stream and S3 bucket
  2. Create staging tables to store raw data from both sources
  3. Create an ingestion job to copy sources into your staging tables
  4. Create a Materialized View to aggregate sales data
  5. Create the final table and job to transform and enrich data 

Let’s walk through the steps

1. Create connections to your sources

Connections in SQLake contain required authentication details for accessing external systems such as Kinesis Data Streams or S3 buckets.  SQLake enables you to authenticate via either AWS IAM roles or AWS IAM access and secret keys.  Examples for both authentication methods are shown below.  

Note that if you have deployed SQLake into your VPC, you can leave the authentication parameters undefined.  The connection defaults to the IAM role you defined for SQLake during the integration process.

Examples for creating a connection to Kinesis:

Authentication via AWS Role

CREATE KINESIS CONNECTION kinesis_stream
   AWS_ROLE = '<role arn>'
   EXTERNAL_ID = '<external id>'
   REGION = '<aws region>';

Authentication via AWS Access Keys

CREATE KINESIS CONNECTION kinesis_stream
   AWS_ACCESS_KEY_ID = '<access key>'
   AWS_SECRET_ACCESS_KEY = '<secret key>'
   REGION = '<aws region>';

There are additional parameters you may choose to set.  Of particular use are:

READ_ONLY = TRUE

SQLake will only read data from Kinesis.

STREAM_DISPLAY_FILTERS = (‘<stream_name1>,<stream_name2>,…’)

SQLake will only show the referenced streams in the connection catalog.

Example of creating a connection to Amazon S3

Authentication via AWS Roles:

CREATE S3 CONNECTION upsolver_s3_samples
   AWS_ROLE = 'arn:aws:iam::949275490180:role/upsolver_samples_role'
   EXTERNAL_ID = 'SAMPLES'
   READ_ONLY = TRUE;

For full documentation on the CREATE S3 CONNECTION command, consult the documentation.

2. Create staging tables to hold source data

Staging tables are used to store a raw and immutable copy of the data that is copied from source systems.  Staging tables do not need schema to be defined, as SQLake automatically infers the schema and creates it in the AWS Glue Data Catalog.  For most staging tables, a best practice would be to partition them by $event_date. This is an automatically-generated system column that is the date of the incoming event.

Create the staging tables using the following commands:

CREATE TABLE default_glue_catalog.database_2777eb.orders_raw_data 
  PARTITIONED BY $event_date;

CREATE TABLE default_glue_catalog.database_2777eb.sales_info_raw_data 
  PARTITIONED BY $event_date;

Tables are configured by default to use the default storage location of the GLUE CATALOG that the table is created in.  If you wish to store ingested data in a different location, you can customize the STORAGE_CONNECTION and STORAGE_LOCATION as shown below:

CREATE TABLE default_glue_catalog.database_2777eb.orders_raw_data 
  PARTITIONED BY $event_date
  STORAGE_CONNECTION = <connection name>
  STORAGE_LOCATION = <s3://bucket/path>;

The full syntax of the CREATE TABLE command can be found in the documentation.

3. Create an ingestion job to copy raw data into the staging table

Next, we need to ingest data from the sources (Kinesis and S3 buckets). To do that we create a COPY FROM  job that reads events from the source and copies them into data lake staging tables. The data is copied continuously, minute by minute.  SQLake automatically manages all table management (partitioning, compaction, cataloging, and so on).

Create a COPY FROM job to ingest from Kinesis and S3 as shown below.

Ingesting from a Kinesis stream

CREATE SYNC JOB ingest_orders_from_kinesis
   START_FROM = BEGINNING
   CONTENT_TYPE = JSON
AS COPY FROM KINESIS kinesis_stream 
   STREAM = 'webstore_orders'
INTO default_glue_catalog.database_2777eb.orders_raw_data;

Ingesting from an S3 bucket

CREATE SYNC JOB ingest_sales_info_from_s3
   START_FROM = BEGINNING
   CONTENT_TYPE = JSON
AS COPY FROM S3 upsolver_s3_samples 
   BUCKET = 'upsolver-samples' 
   PREFIX = 'sales_info/' 
INTO default_glue_catalog.database_2777eb.sales_info_raw_data;

The following are important parameters to consider when building your COPY FROM job:

START_FROM = BEGINNING | NOW | TIMESTAMP ‘<timestamp>’

Defines how far back to begin reading events from the stream.  

BEGINNING reads from the earliest event that is stored on the stream. 

NOW starts reading from the time that the job was created.

TIMESTAMP ‘<timestamp>’ reads from the timestamp referenced.

CONTENT_TYPE = JSON | CSV | PARQUET | AVRO | …

Specifies the type of content to read from the stream.  Certain content types may require additional settings; find these settings in the following documentation.

4. Create a Materialized View to aggregate sales data

Now calculate the most recent sales information from the ingested data. You can do this by creating a Materialized View and selecting the most recent record per order.

Create the Materialized View

CREATE SYNC MATERIALIZED VIEW default_glue_catalog.database_2777eb.store_orders_mv
AS
SELECT orderid,
  LAST(salesinfo.source) AS Source,
  LAST(salesinfo.store.location.country) AS country,
  LAST(salesinfo.store.location.name) AS name,
  LAST(salesinfo.store.servicedby.employeeid) AS employeeid,
  LAST(salesinfo.store.servicedby.firstname) AS firstname,
  LAST(salesinfo.store.servicedby.lastname) AS lastname
FROM default_glue_catalog.database_2777eb.sales_info_raw_data
GROUP BY orderid;

Grouping the data by orderid, acting as a primary key, results in each row representing a specific order that contains the most recent sales data. You can now join this materialized view with another table on the orderid column. For documentation on the full syntax, see Create Materialized View

5. Create the final table and job to transform and enrich data 

Lastly, create the target table to hold the transformed and enriched data. This is similar to Step 2. In this example we do not declare all the column and data types – only the ones we need for primary key and partition key. This causes SQLake to automatically detect, infer, and update the schema.  If you do not wish for schema to be automatically detected, you can define a static list of column names and data types.

This is how you create the target table with the order_id as primary key and partition_date as the partitioning column.

CREATE TABLE default_glue_catalog.database_2777eb.orders_with_employee(
  order_id string,
  partition_date date
)
PRIMARY KEY order_id
PARTITIONED BY partition_date;

Note that in the previous statement we defined order_id as the primary key that enables SQLake to insert and update records based automatically.

Now create a job to join data from the staging table and MV, writing the results to the target table in the data lake.

CREATE SYNC JOB join_two_tables_orders_with_last_employee
   START_FROM = BEGINNING
   ADD_MISSING_COLUMNS = TRUE
   RUN_INTERVAL = 1 MINUTE
   AS INSERT INTO default_glue_catalog.database_2777eb.orders_with_employee MAP_COLUMNS_BY_NAME
   SELECT
        S.orderid AS order_id,
        S.nettotal,
        MV.employeeid AS employeeid,
        MV.name AS store_name,
        employee_name,
        $event_date AS partition_date
   FROM default_glue_catalog.database_2777eb.orders_raw_data AS S
   LEFT JOIN default_glue_catalog.database_2777eb.store_orders_mv AS MV
   ON MV.orderid = S.orderid
   LET employee_name = MV.firstname || '_' || MV.lastname
   WHERE MV.source = 'Store'
   AND $event_time BETWEEN run_start_time() AND run_end_time();

There are several things to note here:

  • Data is being processed from the beginning – the first record found. If there is a lot of data, it will take time for SQLake to catch up to the most recent records.
  • If a primary key is specified, SQLake automatically inserts or updates rows.  If you need more control, verbose MERGE WHEN syntax is available.
  • We use the LET keyword to define a computed field. You can have multiple computed fields under the same LET keyword, separated by a comma.

For the full syntax on SQL Transformation Insert job, see the documentation.

Summary

SQLake makes it easy to connect to streaming sources and enrich events in near real-time. Using Materialized Views you create a single source of truth for key datasets that you can then easily join against from other jobs. This ensures consistency and improves reliability of your data pipelines.

Get started today for free with sample data or bring your own.

ctaForm

Start for free - No credit card required

Batch and streaming pipelines.

Accelerate data lake queries

Real-time ETL for cloud data warehouse

Build real-time data products

Get Started Now

Templates

All Templates

Explore our expert-made templates & start with the right one for you.