Building a Lakehouse with Delta Lake

How to use the Delta Lake python library to manage massive amounts of data at scale.
Author

Dhruv Dole

Published

April 12, 2025

Keywords

python, deltalake, lakehouse, pyarrow, polars, minio

The first step in creating any kind of data processes is setting up a system for efficient storage and querying of your data. There are two major storage concepts for OLAP data storage: Data warehouses and data lakes. Warehouses act as a central source of truth for analytics and reporting processes. Warehouses are designed to efficiently store and query highly structured data. Warehouses are often OLAP databases like ClickHouse or managed offerings like Amazon Redshift, Google BigQuery, or Snowflake.

Data Lakes are simply a central storage system for massive amounts of data in any form. This could mean log dumps, json files, CSVs, images… It is also possible to combine the two concepts into the Data Lakehouse. The principle here is storing data using one of the many open table formats(IceBerg, Hudi, Deltalake) in some kind of mass storage, generally cloud object storage like Amazon S3 or Azure Blob Storage. This allows raw, unstructured data to be stored in the same system as structured tabular data. This can provide large cost savings and simplify operations significantly.

This notebook will explain how I set up my own personal data lakehouse with Minio and Delta Lake, while performing queries with pyArrow and Polars

What is A Delta Lake Table

A Delta Lake table is simply a collection of parquet files with matching schemas, and a json transaction log. I won’t go into too much detail here because it is better explained here.

It is important to remember that unlike a basic parquet file, delta tables can become polluted with ‘old’ files. I will explain below how to manage this in python.

Prerequisites

  1. A Minio Bucket with object locking enabled
  2. Minio credentials with R/W access to the bucket
  3. Some tabular data

I will be using a COVID-19 Open-Dataset.

Code

Load Environment Variables

import os
from dotenv import find_dotenv, load_dotenv

try:
    env_file = find_dotenv(raise_error_if_not_found=True)
    load_dotenv(env_file)
except IOError as e:
    print(e)

MINIO_ENDPOINT = os.environ['MINIO_ENDPOINT']
MINIO_ACCESS_KEY = os.environ['MINIO_ACCESS_KEY']
MINIO_SECRET_KEY = os.environ['MINIO_SECRET_KEY']

Download Data

import polars as pl
data_url = 'https://storage.googleapis.com/covid19-open-data/v3/epidemiology.csv'

schema = {
    'date': pl.Date,
    'location_key': pl.String,
    'new_confirmed': pl.Int64,
    'new_deceased': pl.Int64,
    'new_recovered': pl.Int64,
    'new_tested': pl.Int64,
    'cumulative_confirmed': pl.Int64,
    'cumulative_deceased': pl.Int64,
    'cumulative_recovered': pl.Int64,
    'cumulative_tested': pl.Int64,
}
df = pl.read_csv(data_url, schema=schema)

df.sample(10)
shape: (10, 10)
date location_key new_confirmed new_deceased new_recovered new_tested cumulative_confirmed cumulative_deceased cumulative_recovered cumulative_tested
date str i64 i64 i64 i64 i64 i64 i64 i64
2021-09-02 "BR_PE_260750" -7 0 null null 998 13 null null
2021-02-15 "US_MO_29015" 1 0 null null 1432 23 null null
2021-07-01 "IL_M_0577" 0 0 0 null 671 1 661 null
2020-12-16 "BR_MG_311030" 5 0 0 8 219 5 0 674
2020-09-16 "RU_YAR" 53 2 69 null 7904 45 7419 null
2020-08-18 "IN_CT_MHS" 2 1 17 0 282 5 188 2183
2021-07-02 "PL_20_13" 0 0 null 60 null null null null
2021-12-31 "US_MA_25001" 260 3 null null 24650 559 null null
2022-03-11 "BR_AL_270510" 0 0 null null 1675 45 null null
2020-10-08 "DE_SH_01004" 1 0 1 null 172 3 169 null

Create Empty Deltalake Schema

Note

The variable dtable_schema_name maps to a key prefix added after the bucket_name and before the actual Delta Table files. This means we can store as many different tables in one bucket as we want.

from deltalake import DeltaTable

# Minio Connection Parameters
storage_options = {
    'endpoint_url': MINIO_ENDPOINT,
    'AWS_ACCESS_KEY_ID': MINIO_ACCESS_KEY,
    'AWS_SECRET_ACCESS_KEY': MINIO_SECRET_KEY,
    'conditional_put': 'etag' #https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/#enabling-concurrent-writes-for-alternative-clients
}
bucket_name = 'deltalake-demo'
dtable_schema_name = 'covid'

dtable_schema = df.to_arrow().schema  # convert dataframe schema to pyArrow

dtable = DeltaTable.create(table_uri=f's3a://{bucket_name}/{dtable_schema_name}', schema=dtable_schema,
                           storage_options=storage_options)

Write Dataframe to Delta Lake

df.write_delta(dtable, mode='append')

Querying the Delta Lake

Now that our dataset is in the DeltaLake we have to be able to query it without loading all of it into memory at once. We can use Polars or PyArrow for this purpose.

Polars

# Create a LazyFrame representing the Delta Table
ldf = pl.scan_delta(dtable, use_pyarrow=True)
ldf.collect_schema()
Schema([('date', Date),
        ('location_key', String),
        ('new_confirmed', Int64),
        ('new_deceased', Int64),
        ('new_recovered', Int64),
        ('new_tested', Int64),
        ('cumulative_confirmed', Int64),
        ('cumulative_deceased', Int64),
        ('cumulative_recovered', Int64),
        ('cumulative_tested', Int64)])

Return the first 10 records:

ldf.head(10).collect()
shape: (10, 10)
date location_key new_confirmed new_deceased new_recovered new_tested cumulative_confirmed cumulative_deceased cumulative_recovered cumulative_tested
date str i64 i64 i64 i64 i64 i64 i64 i64
2020-01-01 "AD" 0 0 null null 0 0 null null
2020-01-02 "AD" 0 0 null null 0 0 null null
2020-01-03 "AD" 0 0 null null 0 0 null null
2020-01-04 "AD" 0 0 null null 0 0 null null
2020-01-05 "AD" 0 0 null null 0 0 null null
2020-01-06 "AD" 0 0 null null 0 0 null null
2020-01-07 "AD" 0 0 null null 0 0 null null
2020-01-08 "AD" 0 0 null null 0 0 null null
2020-01-09 "AD" 0 0 null null 0 0 null null
2020-01-10 "AD" 0 0 null null 0 0 null null

Find all records in July 2022 and return the first 10:

ldf.filter(
    (pl.col('date') >= pl.date(2022, 7, 1)) &
    (pl.col('date') < pl.date(2022, 8, 1))
).collect().head(10)
shape: (10, 10)
date location_key new_confirmed new_deceased new_recovered new_tested cumulative_confirmed cumulative_deceased cumulative_recovered cumulative_tested
date str i64 i64 i64 i64 i64 i64 i64 i64
2022-07-01 "AD" 0 0 null null 44177 153 null null
2022-07-02 "AD" 0 0 null null 44177 153 null null
2022-07-03 "AD" 0 0 null null 44177 153 null null
2022-07-04 "AD" 0 0 null null 44177 153 null null
2022-07-05 "AD" 494 0 null null 44671 153 null null
2022-07-06 "AD" 0 0 null null 44671 153 null null
2022-07-07 "AD" 0 0 null null 44671 153 null null
2022-07-08 "AD" 0 0 null null 44671 153 null null
2022-07-09 "AD" 0 0 null null 44671 153 null null
2022-07-10 "AD" 0 0 null null 44671 153 null null

Find all records in July 2022 and find the average new_confirmed cases per location_key:

ldf.filter(
    (pl.col('date') >= pl.date(2022, 7, 1)) &
    (pl.col('date') < pl.date(2022, 8, 1))
).group_by('location_key').agg(pl.col('new_confirmed').mean()).collect().head(10)
shape: (10, 2)
location_key new_confirmed
str f64
"BR_GO_520500" 0.741935
"BR_MG_314655" 1.483871
"IL_Z_0874" 27.903226
"BR_PA_150172" 14.806452
"BR_RR_140005" 10.935484
"ES_CT_43161" 3.761905
"BR_RN_240420" 2.548387
"BR_SE_280700" 0.129032
"BR_MG_312010" 0.0
"IL_Z_4203" 0.806452

PyArrow

Note

While PyArrow doesn’t have many of the analytical features of Polars, It excels at transforming massive amounts of data between different storage formats. For example, I use PyArrow to transfer data between DeltaLake tables and ClickHouse databases).

# Instantiate the PyArrow Dataset
ds = dtable.to_pyarrow_dataset()
ds.schema
date: date32[day]
location_key: string
new_confirmed: int64
new_deceased: int64
new_recovered: int64
new_tested: int64
cumulative_confirmed: int64
cumulative_deceased: int64
cumulative_recovered: int64
cumulative_tested: int64

Find all records in July 2022 and find the average new_confirmed cases per location_key:

import pyarrow as pa
import pyarrow.compute as pc
from datetime import date

july_table = ds.filter(
    ((pc.field('date') >= pa.scalar(date(2022, 7, 1), type=pa.date32())) &
     (pc.field('date') < pa.scalar(date(2022, 8, 1), type=pa.date32()))
     )
).to_table()

result = pa.TableGroupBy(july_table, 'location_key').aggregate([('new_confirmed', 'mean')])
result
pyarrow.Table
location_key: string
new_confirmed_mean: double
----
location_key: [["BR_MG_314875","BR_MG_314880","BR_MG_314890","BR_MG_314900","BR_MG_314910",...,"IL_WE_3779","IL_WE_3780","IL_WE_3788","IN_UT_BGS","BR_MT_510517"]]
new_confirmed_mean: [[0.4838709677419355,2.064516129032258,3.5483870967741935,0,1.7741935483870968,...,1.2258064516129032,3.903225806451613,0.6774193548387096,0.41935483870967744,12.290322580645162]]

PyArrow has no integrated way to view data so we have use result.to_pandas() or convert the table into a polars df:

pl.DataFrame(result).head(10)
shape: (10, 2)
location_key new_confirmed_mean
str f64
"BR_MG_314875" 0.483871
"BR_MG_314880" 2.064516
"BR_MG_314890" 3.548387
"BR_MG_314900" 0.0
"BR_MG_314910" 1.774194
"BR_MG_314915" 0.0
"BR_MG_314920" 0.387097
"BR_MG_314930" 0.0
"BR_MG_314940" 0.0
"BR_MG_314950" 0.0

DeltaLake Maintenance

As I mentioned earlier, delta tables will become polluted over time. This can be managed with the following:

Compacting the table is useful for turning a table made up of many small files into fewer larger ones.

dtable.optimize.compact()
{'numFilesAdded': 1,
 'numFilesRemoved': 20,
 'filesAdded': '{"avg":58140236.0,"max":58140236,"min":58140236,"totalFiles":1,"totalSize":58140236}',
 'filesRemoved': '{"avg":3854084.2,"max":4648234,"min":3193592,"totalFiles":20,"totalSize":77081684}',
 'partitionsOptimized': 1,
 'numBatches': 12244,
 'totalConsideredFiles': 20,
 'totalFilesSkipped': 0,
 'preserveInsertionOrder': True}

The vacuum command is a garbage collector which cleans files which have been marked for deletion. More on this here.

dtable.vacuum() # Does nothing in this instance because we haven't deleted any rows.
[]

Conclusion

Delta Lake is a powerful tool for managing massive amounts of data efficiently and cost-effectively. It provides robust features like ACID transactions, scalable metadata handling, and efficient data storage formats, ensuring data integrity and reliability. By leveraging Delta Lake, organizations can perform complex data operations without compromizing on cost or performance.

Resources