Why we built a SQL-based solution to unify batch and stream workflows

Balancing Trade-offs

The path to the ideal product is paved with trade-offs, as any product owner will tell you. Months after the launch of Upsolver SQLake, as I navigate the concepts the team had to deeply explore to turn them into product features and capabilities, I admire the tight-rope walk to curate a platform so carefully balancing abstraction and prowess.

Over the past few weeks, I sat down with our CTO, Yoni, to probe our thought process around the most fundamental decisions and trade-offs we had to make, especially in the context of how we see the history and future of data workflow authoring and management. We compiled these discussions as a playlist on YouTube, broken down into the following conceptual units:

I’m delighted to be able to share these with you, because I believe bringing you along in our journey is the best way to share with you my excitement for Upsolver and why we built what we built. Enjoy!

Orchestration

Orchestration is one of the main challenges in data pipelines that we ran into before we built Upsolver. Although we tend to associate orchestration with weekly or daily batch ETL pipelines, it is, in fact, a non-issue for managing jobs that run at such long intervals. If a batch pipeline breaks, an ETL engineer can receive an alert and have a day or longer to resolve the issue before the pipeline needs to run again. She may even be able to manually re-trigger the failed run to fill in data gaps caused by the failure. Many ETL pipelines run on such long cadences and the only aspect of orchestration they ever need is scheduling.

However, businesses and their applications today run on data, not on a schedule. The need for fresh data means pipelines need to run with ever increasing frequency. Batch processes are being replaced with micro batches down to hour or even minute intervals, and in some cases, organizations are faced with a new need to handle real-time data.

For data-in-motion, orchestration indeed becomes a primary challenge in ETL pipelines. Data teams that want to empower organizations to create differentiation with their data are suddenly forced to spend all their time just keeping workflows up and running, at the beck and call of pipeline alerts and failures.

Why is orchestration crucial for data-in-motion? Batch orchestrators rely on idempotency of individual steps in a pipeline, whereas processes are seldom idempotent in real life. Rolling back and replaying workflows when needed can result in data quality and integrity issues, which orchestration frameworks take no responsibility for. To prevent such issues, engineers have to take into consideration every edge case, every possible scenario rendering a pipeline replay necessary, just to prevent those situations playing out. On the one hand, well-managed batch pipelines tend to be robust, while on the other, they require an incommensurate amount of effort to build and maintain compared to the value they generate.

The smoother you want your pipeline to run, the more engineering hours you have to put into it—the faster you need it to run, the more complex your error handling needs to be. All this means that analytics engineers are spending more time on the operation of their pipelines than they are on business logic.

This was a key challenge we set out to solve with SQLake. And we needed to solve it at the optimal level of abstraction—in SQLake, pipelines would have to be easy to build while also being able to self-orchestrate.

State Store Management

State can be thought of as the information needed to perform live processing on streaming data in order to get them to a meaningful, performant representation while still en-route. Doing so sets us up for optimizing downstream data transformation and storage, computing aggregations and metrics with minimal latency, and even supporting real-time applications built on the data.

State is one of the main challenges in stream processing because you often need to hold a lot of it. Event streaming and messaging queue technologies such as Kafka and RabbitMQ aim to retain a minimal amount of state under memory pressure and in order to maximize throughput. At the other end of the spectrum, frameworks designed for stateful computations and distributed processing, such as Spark and Flink, store large amounts of state in memory and in hard-drives, requiring larger and more expensive machines, which can also result in catastrophic loss of all state when they go down.

On the human front, as a data engineer, maintaining a state store adds to the already overwhelming overhead to data workflow management, which includes supporting pipeline orchestration frameworks such as Airflow, data source and target connectors, security management, and so on.

In SQLake, we built a decoupled state store to allow us to scale and handle large amounts of state required to support streaming joins and aggregations. Our cloud-based data lake is richly cataloged and optimized for querying. More importantly, all the metadata also lives in the data lake: providing a powerful and scalable, yet efficient and cost-effective, mechanism to house our own state store. Compute clusters can connect to and read and write state from the data lake, but are decoupled from each other such that the lake is the source of truth for all state at any given time. Any pipeline break can be immediately recovered simply by re-connecting a compute cluster to the lake and picking up where the process left off. If your use case involves joining streaming data to batch data in real-time, for example, neither your place in the stream nor the state of your batch data is irrevocably lost. The state retrieval and workflow continuation is automatic—no engineering intervention required.

We saw this as necessary innovation for managing state in pipelines for data-in-motion, where stateful transformations like joins and aggregations are heavily used. Ultimately, it enables joining data sets in real-time, which the basic dependency management afforded by batch (or micro batch) orchestrators cannot. Rather than require you to work hard to approximate idempotency in your workflows, Upsolver’s SQLake synchronizes the state store directly with the data, making possible idempotent transformations which allow you to time travel and replay pipelines as needed.

Time Consistency in Stream Processing

Enabling time travel is another exciting challenge we had to solve. In order to fully represent the evolution of a data record over time, we need at least the following two bits of information:

  • when the data was produced—the event time, and
  • when the data was committed to the database—the commit time.

In data workflows, not only can these two times be different, but the temporal relationship between them (i.e. which happened first) can also vary. While it shouldn’t be possible to write an event before it occurred, we frequently encounter this occurrence in data pipelines due to timezone issues, stray operations updating timestamps etc. 

Having tracked both event and commit times for our data records, we are faced with decisions on which to use for aggregations and joins on the data.  

If I aggregate on event time, I need to incorporate all events that happened at that time, including ones that haven’t yet arrived—i.e. late arriving events. But how long do we wait for such events? How many times do we recompute our aggregation to account for them? While event times synchronize concurrently produced data, the possibly infinite window of data arrival can make pipelines untenable for productization. Not knowing when events will arrive means that we have to store a lot of state—consuming significant memory—, and that we can’t make time-based SLAs on data products.

If instead I join two tables based on commit time, I’ll see all the data that were committed to each table at the given time. However, the same data record may not be available in both tables if there’s a delay between writing to one table versus the other, even though technically the event has arrived in the database. Such a lag is a common occurrence in ETL processes, since transformations take time. Commit times are inherently not synchronized, and analytics based on them can be misleading.

Based on the intent of the workflow, either technique could be appropriate. But both have gotchas when used for incorrect use cases. At Upsolver, we believe that the business should dictate which is used and when, not the data platform or infrastructure. For instance, you should be able to decide how old your data are allowed to be to still be considered relevant for the use case your workflow supports. To allow pipeline synchronization based on use case, we employ a novel way of using materialized views for joins and aggregations.  

Finally, we also had to tackle the challenge of data ordering. Since order matters for answering many analytics questions, we have to find a way to ensure total ordering of data in the lake. Data is strongly ordered within partitions in event streaming services, but between partitions you lack this guarantee. In SQLake, we introduce a monotonically increasing event time for concurrent events (events generated by the same process). This timestamp stays as close to the event time of data records as possible while preventing writing data back in time. In conjunction with the data lake architecture, it helps us achieve total ordering of data in SQLake and enables us to guarantee eventual idempotency and exactly-once consistency.

To facilitate choosing the right kinds of transformations for stream processing workflows, we define the following two types of jobs in SQLake:

An unsynchronized job uses the commit time of data in its input tables, continually processes any committed data, and guarantees that each event will be processed only once. Unsynchronized pipelines do not, however, wait for data to arrive, and do not ensure that all transformations on a data record remain in lock-step for downstream usage.
A synchronized job in Upsolver SQLake uses the event time of data. This means it is guaranteed to run on the same set of events as every other synchronized job that runs using the same event time window and the same data sources. A data record’s event time always stays the same, and is propagated as a metadata field through all pipelines. Synchronized pipelines will wait for data and enforce acyclic dependencies—meaning that if table A is an input for table B, table B cannot be an input for any transformation feeding table A. 

Pipeline synchronization allows us to incorporate implicit, data-defined, dependencies into workflows. Downstream jobs wait not for upstream jobs to finish per se, as in the case of batch orchestrators and queries run against databases, but for the relevant input data to arrive.

Pipeline Synchronization

To appreciate the importance of time and exactly-once consistency, we need to consider some use cases that rely, or don’t rely, on pipeline synchronization.  

Say you want to enrich events in a stream with batch data from a table, where the batch and stream data have a shared entity and temporal, if not causal, relationship. A common scenario for this pattern appears in adtech, where an ad click event has to be contextualized with the ad serving and user interaction logic stored in the database. In SQLake, a materialized view on top of the user interaction table can be used to join to the streaming click events. Synchronized on the monotonically increasing event time of the stream, the join will only happen when an event arrives. If the stream is delayed, the join and downstream pipeline wait. This allows us to make the correct attribution between the ad and the click—whether the user clicks on the ad seconds or a half hour after seeing it.

By contrast, suppose we’re joining two data sources that do not share temporal context and we therefore do not care about attribution. For example, say we want to know the breakdown of users currently logged onto our product by ZIP code. To compute this metric, we join the online-users table with the users entity table (which contains addresses) in the database, and count by ZIP code. This pipeline does not need to wait for updates to the users entity table because the online users changes at a much faster rate than address updates of individual users. We use an unsynchronized job for such a use case, so that downstream applications dependably receive the aggregates they need, even if one of the data sources gets delayed or remains static.

In SQLake, combining synchronized and unsynchronized jobs unlocks workflow configurations that allow us to handle many other nuanced use cases. One example is the graceful handling of backfills, which are necessary when the event time and commit time of data records are very different. Recall that because we use a monotonically increasing event time marker for processing data from a stream, it’s not possible to process new arriving data records as though they had come earlier. When creating a new data source, however, we have the opportunity to backfill the queue with older data, if needed. In this scenario, we associate the event time of the data record in SQLake to the event time from the data source, such as the kafka stream or S3 bucket. We now have a number of options for how to perform joins and aggregations on these data based on our use case.

For temporally independent data, the processing of older data can happen at any time and should not delay other processes consuming the stream—we therefore use an unsynchronized pipeline and simply process the historical data as a standalone batch.

For temporally dependent data, we would typically want to configure the system to execute transformations on the older data records synchronized with all other data relevant to that event time based on the source. Doing so, however, can introduce the following gotcha. If a data record which happens to be exceptionally out of place—even for the older data we are handling—arrives through the stream, the synchronized job would wait for other data in its cohort (which may never arrive), halting any dependent workflows.

To prevent such an occurrence, we can instead configure synchronization to only be required on a going forward basis. An unsynchronized job first ingests the data from the sources and then a synchronized job, containing sufficient logic to handle rogue behavior, picks up the data for further processing. The results of previously completed pipeline runs remain unchanged. Alternatively, we can use a fully synchronized job to process the data for backfill, but decouple it from any production workflow that uses data from the streams. Only when the process is complete, do we use the backfilled data for other workflows.

In SQLake, we leave it up to the user to decide, based on her use case, whether to use unsynchronized jobs, synchronized jobs with full historical event time consistency, or synchronized jobs with event time consistency going forward. Affording this flexibility to the user greatly improves the pipeline authoring experience, because business requirements, not platform restrictions, determine how data are treated and transformed. 

Parting Thoughts

For users new to working simultaneously with batch and streaming data, and experiencing use cases where streaming joins are needed, we recommend defaulting to using synchronized pipelines. They provide increased data observability, enabling us to perform monitoring and quality assurance, and are the only way to retain the time connectivity of temporally related data. Simply put, synchronization of pipelines enforces hermeticity. Without data lineage based on synchronization, it is much harder to reason about the time evolution of data records and inter-record time connectivity—it’s a headache we can all do without. 

You can always selectively build unsynchronized queries and jobs on top of tables that were created by synchronized jobs, but you can’t go the other way without losing information. If you do need to incorporate unrelated data from a different pipeline or source into your synchronized pipeline, simply use a materialized view. The synchronized pipeline receives the latest data from the second pipeline through the materialized view, but is unhindered by any blockers in that pipeline.

Simple, right? That’s what we’re going for with SQLake at Upsolver. Write the workflow you need to write—that too in SQL—without worrying about the minutiae of orchestration. You can try it out for free here.

Published in: Blog , Data Lakes
Santona Tuli, PhD
Santona Tuli, PhD

Santona is a physicist turned machine learning engineer turned data workflow architect and product strategist. She likes making data pipeline authoring friendly to different types of developers.

Keep up with the latest cloud best practices and industry trends

Get weekly insights from the technical experts at Upsolver.

Subscribe

Templates

All Templates

Explore our expert-made templates & start with the right one for you.