Data Pipelines
An ETL is nothing else than a DAG of jobs, each of them reading datasets as input, running computation, and producing new datasets (or updating them).
getOperate enables building fast, powerful, reliable, and easy-to-build data pipelines:
- The DX in getOperate allows you to quickly assemble flows that can process data step by step in a visual and easy-to-manage way ;
- You can control parallelism between individual steps, and set concurrency limits in case external resources need are fragile or rate limited ;
- getOperate flows can be restarted from any step, making the iteration process of building a pipeline (or debugging one) smooth and efficient ;
- Monitoring is made easy with error and recovery handlers.
The particularity of data pipeline flows vs. any other kind of automation flows is that they run computation on large datasets and the result of such computation is itself a (potentially large) dataset that needs to be stored.
For the compute, as data practitioner for the most demanding ETLs, we have observed that in almost all cases, the system they run on is ill-designed for their task. Much faster alternatives now exist leveraging the modern OLAP processing libraries. We have integrated with Polars and DuckDB, as ones of the best-in-class in-memory data processing libraries and they fit particularly well getOperate since you can assign variously sized workers depending on the step.
To give you a quick idea:
- Running a
SELECT COUNT(*), SUM(column_1), AVG(column_2) FROM my_table GROUP_BY key
with 600M entries inmy_table
requires less than 24Gb of memory using DuckDB - Running a
SELECT * FROM table_a JOIN table_b ORDER BY key
, withtable_a
having 300M rows andtable_b
75M rows with DuckDB requires 24Gb of memory
Add to those numbers that on AWS for example, you can get up to 24Tb of memory on a single server. Nowadays, you don’t need a complex distributed computing architecture to process a large amount of data.
And for storage, you can now link a getOperate workspace to an S3 bucket and use it as source and/or target of your processing steps seamlessly, without any boilerplate.
The very large majority of ETLs can be processed step-wise on single nodes and getOperate provides (one of) the best models for orchestrating non-sharded compute. Using this model, your ETLs will see a massive performance improvement, your infrastructure will be easier to manage and your pipeline will be easier to write, maintain, and monitor.
getOperate integration with an external S3 storage
In getOperate, a data pipeline is implemented using a flow, and each step of the pipeline is a script. One of the key features of getOperate flows is to easily pass a step result to its dependent steps. But because those results are serialized to getOperate database and kept as long as the job is stored, this obviously won’t work when the result is a dataset of millions of rows. The solution is to save the datasets to an external storage at the end of each script.
In most cases, S3 is a well-suited storage and getOperate now provides a basic yet very useful integration with external S3 storage.
The first step is to define an S3 resource in getOperate and assign it to be the Workspace S3 bucket in the workspace settings.
From now on, getOperate will be connected to this bucket and you’ll have easy access to it from the code editor and the job run details. If a script takes as input a s3object
, you will see in the input form on the right a button helping you choose the file directly from the bucket.
Same for the result of the script. If you return an s3object
containing a key s3
pointing to a file inside your bucket, in the result panel there will be a button to open the bucket explorer to visualize the file.
Clicking on the button will lead directly to a bucket explorer. You can browse the bucket content and even visualize file content without leaving getOperate.
Clicking on one of those buttons, a drawer will open displaying the content of the workspace bucket. You can select any file to get its metadata and if the format is common, you’ll see a preview. In the above picture, for example, we’re showing a Parquet file, which is very convenient to quickly validate the result of a script.
From there you always have the possibility to use the S3 client library of your choice to read and write to S3. That being said, Polars and DuckDB can read/write directly from/to files stored in S3 getOperate now ships with helpers to make the entire data processing mechanics very cohesive.
Canonical data pipeline in getOperate w/ Polars and DuckDB
With S3 as the external store, a transformation script in a flow will typically perform:
- Pulling data from S3.
- Running some computation on the data.
- Storing the result back to S3 for the next scripts to be run.
getOperate SDKs now expose helpers to simplify code and help you connect Polars or DuckDB to the getOperate workspace S3 bucket. In your usual IDE, you would need to write for each script:
In getOperate, you can just do:
And similarly for Polars:
becomes in getOperate:
And more to come! With both getOperate providing the boilerplate code, and Polars and DuckDB handling reading and writing from/to S3 natively, you can interact with S3 files very naturally and your getOperate scripts become concise and focused on what really matters: processing the data.
In the end, a canonical pipeline step in getOperate will look something like this:
The example uses Polars. If you’re more into SQL you can use DuckDB, but the code will have the same structure: initialization, reading from S3, transforming, writing back to S3.
In-memory data processing performance
By using Polars, DuckDB, or any other data processing libraries inside getOperate, the computation will happen on a single node. Even though you might have multiple getOperate workers, a script will still be run by a single worker and the computation won’t be distributed. We’ve run some benchmarks to expose the performance and scale you could expect for such a setup.
We’ve taken a well-known benchmark dataset: the TCP-H dataset. It has the advantage of being available in any size and being fairly representative of real use cases. We’ve generated multiple versions: 1Gb, 5Gb, 10Gb and 25Gb (if you prefer thinking in terms of rows, the biggest table of the 25Gb version has around 150M rows). We won’t detail here the structure of the database or the queries we’ve run, but TPC-H is well-documented if needed.
The following procedure was followed:
- Datasets provided by TPC-H as CSVs were uploaded as parquet files on S3.
- TPC-H provides a set of canonical queries. They perform numerous joins, aggregations, group-bys, etc. 8 of them were converted in the different dialects.
- Those queries were run sequentially as scripts in for-loop flow in getOperate, and this for each of the benchmark sets of data (1Gb, 5Gb, 10Gb, etc.). The memory of the getOperate server was recorded.
- Each script was:
- Reading the data straight from the S3 parquet files
- Running the query
- Storing the result in a separate parquet file on S3
A couple of notes before the results:
- We’ve run those benchmarks on a
m4.xlarge
AWS server (8 vCPUs, 32Gb of memory). It’s not a small server, but also not terribly large. Keep in mind you can get up to 24Tb of Memory on a single server on AWS (yes, it’s not cheap, but it’s possible!) - Polars comes with a lazy mode, in which it is supposed to be more memory efficient. We’ve benchmarked both normal and lazy mode.
- We also ran those benchmarks on Spark, as a well-known and broadly used reference. To be as fair as possible, the Spark “cluster” was composed of a single node running also on an
m4.xlarge
AWS instance.
Polars is the fastest at computing the results, but consumes slightly more memory than Spark (it OOMed for the 25G benchmark). Polars in lazy mode, however, is a lot more memory efficient and can process more data, at the expense of computation time. Overall, both Polars and DuckDB behave very well in terms of memory consumption and computation time. The 10G benchmark contains tables with up to 60 million rows, and we were far from using the most powerful AWS instance. So, it is true that this doesn’t scale horizontally, but it also confirms that a majority of data pipelines can be addressed with a large enough instance. And when you think about it, what’s more convenient? Managing a single beefy server or a fleet of small servers?
DuckDB offers the possibility to back its database with a file on disk to save some memory. This mode fits perfectly with getOperate flows using a shared directory between steps. We implemented a simple flow where the first step loads the DB in a file, and the following steps consume this file to run the queries. We were able to run the 8 queries on the 100Gb benchmark successfully. It took 40 minutes and consumed 29Gb at the peak.
Learn more
For more details on storage in getOperate, see:
Persistent Storage
Ensure that your data is safely stored and easily accessible whenever required.