Reading Data from a Kafka Topic and Ingesting it into the Data Lake

Store Raw Events at Scale in a Data Lake

Apache Kafka is used to stream messages and events between micro-services and applications, making communicating changes fast and consistent. To collect and process this data for the purpose of analytics and machine learning it’s common practice to store these events in the data lake. 

But storing the raw events in the data lake can lead to some challenges.  In particular the rate at which events may be flowing will require a fair bit of processing power to write them. Also, to maintain access to fresh and up-to-the-minute data, services that write events to the data lake do not usually batch.  This results in many small files that contain only few events per file. Trying to query this data is slow and costly because the raw data is not optimized by default.  In this tutorial, you’ll learn how to use Upsolver SQLake to easily read data from a Kafka topic and stage it in a data lake table.  SQLake automatically scales to match the volume of events and optimizes the data in the data lake so queries are fast and cost-effective.

Creating a Kafka connection in SQLake is straightforward.  Here is the syntax:

    [ VERSION = { CURRENT | LEGACY } ]  

    [ REQUIRE_STATIC_IP = { TRUE | FALSE } ]

    [ SSL = { TRUE | FALSE } ]

    [ TOPIC_DISPLAY_FILTER[S] = {‘<topic_name>’ | (‘<topic_name>'[, …]) } ]

    [ COMMENT = ‘<comment>’ ]

The following is an example showing how to create a Kafka connection using username and password with plain SASL authentication.

CREATE KAFKA CONNECTION <CONNECTION_IDENTIFIER>
 HOSTS = ('<BOOSTRAP_SEVER1>:<PORT_NUMBER>')
 CONSUMER_PROPERTIES = '
   <BOOTSTRAP_SERVER1>:<PORT_NUMBER>
   security.protocol=SASL_SSL
   sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
   required username="<KAFKA_USERNAME>"
   password="<KAFKA_PASSWORD>";
   ssl.endpoint.identification.algorithm=https
   sasl.mechanism=PLAIN
 '
 VERSION = CURRENT
 REQUIRE_STATIC_IP = TRUE
 SSL = FALSE
 TOPIC_DISPLAY_FILTERS = ('topic1', 'topic2')
 COMMENT = 'My new Kafka connection';

Typically you would need to configure SSL for your Kafka connection. Please refer to the documentation for more detail on setting this up.

The following is an example showing how to create a Kafka connection using SSL.

CREATE KAFKA CONNECTION my_kafka_connection
  HOSTS = (
    '<bootstrap_server_1>:<port_number>',
    '<bootstrap_server_2>:<port_number>'
  )
  CONSUMER_PROPERTIES = '
    security.protocol=SSL
    ssl.truststore.location=/opt/kafka.client.truststore.jks
    ssl.keystore.location=/opt/kafka.client.keystore.jks
    ssl.keystore.password=<PASSWORD>
    ssl.key.password=<PASSWORD>
  ';

Using SQLake to ingest data from Kafka

Three quick steps are all you need:

  1. Create a connection to your Kafka cluster
  2. Create a staging table in the data lake
  3. Create an ingestion job to read from the Kafka topic and write to the data lake

Let’s walk through the steps.

1. Create a connection to your Kafka cluster

To get started quickly, we recommend working with Confluent cloud Kafka because it’s faster and easier to set up. The following SQL creates a Kafka connection in SQLake; substitute your Confluent configuration as needed.

CREATE KAFKA CONNECTION upsolver_kafka_samples
    HOSTS = ('<hostname>.us-east-1.aws.confluent.cloud:9092')
    CONSUMER_PROPERTIES = '
    	bootstrap.servers=<hostname>.us-east-1.aws.confluent.cloud:9092
    	security.protocol=SASL_SSL	 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username="<USERNAME>"   password="<PASSWORD>";
    	ssl.endpoint.identification.algorithm=https
    	sasl.mechanism=PLAIN';

2. Create a staging table in the data lake

With a connection to Kafka created, you now create the staging data lake table. Since we don’t know the schema of the events we’ll let SQLake automatically detect and populate the metadata in the AWS Glue Data Catalog for us.

CREATE TABLE default_glue_catalog.database_0297c0.staged_kafka_orders ()
PARTITIONED BY $event_date;

3. Create an ingestion job to read from the Kafka topic and write to the data lake

Now create a job to copy events from a Kafka topic and load them into the data lake table. SQLake automatically writes data in Apache Parquet format and compacts small files into larger ones.  This improves query performance.

CREATE SYNC JOB load_raw_data_from_kafka_topic_orders
  START_FROM = BEGINNING
  CONTENT_TYPE = JSON
  READER_SHARDS = 2
  COMPRESSION = SNAPPY
  COMMENT = 'Load raw orders data from Kafka topic to a staging table'
  AS COPY FROM KAFKA upsolver_kafka_samples
    TOPIC = 'orders'
  INTO default_glue_catalog.database_0297c0.staged_kafka_orders;

Execute the job and SQLake automatically starts pulling events from Kafka. Note that since we configured START_FROM to BEGINNING, SQLake will attempt to seek and read the firstmost event that is available in the topic. If you prefer to instruct SQLake to read new events starting from now, configure this setting to NOW.  Also, the READER_SHARDS property enables you to configure how many readers SQLake will use to consume events. A best practice is 1 reader for every 70 MB/second of traffic ingested per topic.

In some cases, you may want to connect to an external Kafka schema registry that SQLake will use to learn the schema of the events. You can configure the schema registry when creating the ingestion job as follows:

CREATE SYNC JOB load_raw_data_from_kafka_topic_orders
  START_FROM = BEGINNING
  CONTENT_TYPE = (
    TYPE = AVRO_SCHEMA_REGISTRY
    SCHEMA_REGISTRY_URL = 'https://<hostname>.confluent.cloud/schema/111'
  )
  READER_SHARDS = 2
  COMPRESSION = SNAPPY
  COMMENT = 'Load raw orders data from Kafka topic to a staging table'
  AS COPY FROM KAFKA upsolver_kafka_samples
    TOPIC = 'orders'
  INTO default_glue_catalog.database_0297c0.staged_kafka_orders;

For more details about the different properties you can configure and what they mean, visit the ingestion job documentation.

After the job is created you can query the staging table directly from SQLake by executing a SELECT query.  You can also use your favorite data lake query engine, such as Amazon Athena.

Summary

Kafka is a core platform for many companies as a popular way to move data in real-time between applications and services. It’s used to stream events like user visits and clicks, ad impressions, online game activities, and ecommerce purchases. This data is extremely useful for analytics and machine learning, and SQLake makes it easy to read this data from Kafka and store it in the data lake or data warehouse for business analysts, application developers, and data scientists to query.

Get started today for free with sample data or bring your own.

ctaForm

Start for free - No credit card required

Batch and streaming pipelines.

Accelerate data lake queries

Real-time ETL for cloud data warehouse

Build real-time data products

Get Started Now

Templates

All Templates

Explore our expert-made templates & start with the right one for you.