import os
from dotenv import find_dotenv, load_dotenv
try:
= find_dotenv(raise_error_if_not_found=True)
env_file
load_dotenv(env_file)except IOError as e:
print(e)
= os.environ['MINIO_ENDPOINT']
MINIO_ENDPOINT = os.environ['MINIO_ACCESS_KEY']
MINIO_ACCESS_KEY = os.environ['MINIO_SECRET_KEY'] MINIO_SECRET_KEY
The first step in creating any kind of data processes is setting up a system for efficient storage and querying of your data. There are two major storage concepts for OLAP data storage: Data warehouses and data lakes. Warehouses act as a central source of truth for analytics and reporting processes. Warehouses are designed to efficiently store and query highly structured data. Warehouses are often OLAP databases like ClickHouse or managed offerings like Amazon Redshift, Google BigQuery, or Snowflake.
Data Lakes are simply a central storage system for massive amounts of data in any form. This could mean log dumps, json files, CSVs, images… It is also possible to combine the two concepts into the Data Lakehouse. The principle here is storing data using one of the many open table formats(IceBerg, Hudi, Deltalake) in some kind of mass storage, generally cloud object storage like Amazon S3 or Azure Blob Storage. This allows raw, unstructured data to be stored in the same system as structured tabular data. This can provide large cost savings and simplify operations significantly.
This notebook will explain how I set up my own personal data lakehouse with Minio and Delta Lake, while performing queries with pyArrow and Polars
What is A Delta Lake Table
A Delta Lake table is simply a collection of parquet files with matching schemas, and a json transaction log. I won’t go into too much detail here because it is better explained here.
It is important to remember that unlike a basic parquet file, delta tables can become polluted with ‘old’ files. I will explain below how to manage this in python.
Prerequisites
- A Minio Bucket with object locking enabled
- Minio credentials with R/W access to the bucket
- Some tabular data
I will be using a COVID-19 Open-Dataset.
Code
Load Environment Variables
Download Data
import polars as pl
= 'https://storage.googleapis.com/covid19-open-data/v3/epidemiology.csv'
data_url
= {
schema 'date': pl.Date,
'location_key': pl.String,
'new_confirmed': pl.Int64,
'new_deceased': pl.Int64,
'new_recovered': pl.Int64,
'new_tested': pl.Int64,
'cumulative_confirmed': pl.Int64,
'cumulative_deceased': pl.Int64,
'cumulative_recovered': pl.Int64,
'cumulative_tested': pl.Int64,
}= pl.read_csv(data_url, schema=schema)
df
10) df.sample(
date | location_key | new_confirmed | new_deceased | new_recovered | new_tested | cumulative_confirmed | cumulative_deceased | cumulative_recovered | cumulative_tested |
---|---|---|---|---|---|---|---|---|---|
date | str | i64 | i64 | i64 | i64 | i64 | i64 | i64 | i64 |
2021-09-02 | "BR_PE_260750" | -7 | 0 | null | null | 998 | 13 | null | null |
2021-02-15 | "US_MO_29015" | 1 | 0 | null | null | 1432 | 23 | null | null |
2021-07-01 | "IL_M_0577" | 0 | 0 | 0 | null | 671 | 1 | 661 | null |
2020-12-16 | "BR_MG_311030" | 5 | 0 | 0 | 8 | 219 | 5 | 0 | 674 |
2020-09-16 | "RU_YAR" | 53 | 2 | 69 | null | 7904 | 45 | 7419 | null |
2020-08-18 | "IN_CT_MHS" | 2 | 1 | 17 | 0 | 282 | 5 | 188 | 2183 |
2021-07-02 | "PL_20_13" | 0 | 0 | null | 60 | null | null | null | null |
2021-12-31 | "US_MA_25001" | 260 | 3 | null | null | 24650 | 559 | null | null |
2022-03-11 | "BR_AL_270510" | 0 | 0 | null | null | 1675 | 45 | null | null |
2020-10-08 | "DE_SH_01004" | 1 | 0 | 1 | null | 172 | 3 | 169 | null |
Create Empty Deltalake Schema
The variable dtable_schema_name
maps to a key prefix added after the bucket_name
and before the actual Delta Table files. This means we can store as many different tables in one bucket as we want.
from deltalake import DeltaTable
# Minio Connection Parameters
= {
storage_options 'endpoint_url': MINIO_ENDPOINT,
'AWS_ACCESS_KEY_ID': MINIO_ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': MINIO_SECRET_KEY,
'conditional_put': 'etag' #https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/#enabling-concurrent-writes-for-alternative-clients
}= 'deltalake-demo'
bucket_name = 'covid'
dtable_schema_name
= df.to_arrow().schema # convert dataframe schema to pyArrow
dtable_schema
= DeltaTable.create(table_uri=f's3a://{bucket_name}/{dtable_schema_name}', schema=dtable_schema,
dtable =storage_options) storage_options
Write Dataframe to Delta Lake
='append') df.write_delta(dtable, mode
Querying the Delta Lake
Now that our dataset is in the DeltaLake we have to be able to query it without loading all of it into memory at once. We can use Polars or PyArrow for this purpose.
Polars
# Create a LazyFrame representing the Delta Table
= pl.scan_delta(dtable, use_pyarrow=True)
ldf ldf.collect_schema()
Schema([('date', Date),
('location_key', String),
('new_confirmed', Int64),
('new_deceased', Int64),
('new_recovered', Int64),
('new_tested', Int64),
('cumulative_confirmed', Int64),
('cumulative_deceased', Int64),
('cumulative_recovered', Int64),
('cumulative_tested', Int64)])
Return the first 10 records:
10).collect() ldf.head(
date | location_key | new_confirmed | new_deceased | new_recovered | new_tested | cumulative_confirmed | cumulative_deceased | cumulative_recovered | cumulative_tested |
---|---|---|---|---|---|---|---|---|---|
date | str | i64 | i64 | i64 | i64 | i64 | i64 | i64 | i64 |
2020-01-01 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
2020-01-02 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
2020-01-03 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
2020-01-04 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
2020-01-05 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
2020-01-06 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
2020-01-07 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
2020-01-08 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
2020-01-09 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
2020-01-10 | "AD" | 0 | 0 | null | null | 0 | 0 | null | null |
Find all records in July 2022 and return the first 10:
filter(
ldf.'date') >= pl.date(2022, 7, 1)) &
(pl.col('date') < pl.date(2022, 8, 1))
(pl.col(10) ).collect().head(
date | location_key | new_confirmed | new_deceased | new_recovered | new_tested | cumulative_confirmed | cumulative_deceased | cumulative_recovered | cumulative_tested |
---|---|---|---|---|---|---|---|---|---|
date | str | i64 | i64 | i64 | i64 | i64 | i64 | i64 | i64 |
2022-07-01 | "AD" | 0 | 0 | null | null | 44177 | 153 | null | null |
2022-07-02 | "AD" | 0 | 0 | null | null | 44177 | 153 | null | null |
2022-07-03 | "AD" | 0 | 0 | null | null | 44177 | 153 | null | null |
2022-07-04 | "AD" | 0 | 0 | null | null | 44177 | 153 | null | null |
2022-07-05 | "AD" | 494 | 0 | null | null | 44671 | 153 | null | null |
2022-07-06 | "AD" | 0 | 0 | null | null | 44671 | 153 | null | null |
2022-07-07 | "AD" | 0 | 0 | null | null | 44671 | 153 | null | null |
2022-07-08 | "AD" | 0 | 0 | null | null | 44671 | 153 | null | null |
2022-07-09 | "AD" | 0 | 0 | null | null | 44671 | 153 | null | null |
2022-07-10 | "AD" | 0 | 0 | null | null | 44671 | 153 | null | null |
Find all records in July 2022 and find the average new_confirmed
cases per location_key
:
filter(
ldf.'date') >= pl.date(2022, 7, 1)) &
(pl.col('date') < pl.date(2022, 8, 1))
(pl.col('location_key').agg(pl.col('new_confirmed').mean()).collect().head(10) ).group_by(
location_key | new_confirmed |
---|---|
str | f64 |
"BR_GO_520500" | 0.741935 |
"BR_MG_314655" | 1.483871 |
"IL_Z_0874" | 27.903226 |
"BR_PA_150172" | 14.806452 |
"BR_RR_140005" | 10.935484 |
"ES_CT_43161" | 3.761905 |
"BR_RN_240420" | 2.548387 |
"BR_SE_280700" | 0.129032 |
"BR_MG_312010" | 0.0 |
"IL_Z_4203" | 0.806452 |
PyArrow
While PyArrow doesn’t have many of the analytical features of Polars, It excels at transforming massive amounts of data between different storage formats. For example, I use PyArrow to transfer data between DeltaLake tables and ClickHouse databases).
# Instantiate the PyArrow Dataset
= dtable.to_pyarrow_dataset()
ds ds.schema
date: date32[day]
location_key: string
new_confirmed: int64
new_deceased: int64
new_recovered: int64
new_tested: int64
cumulative_confirmed: int64
cumulative_deceased: int64
cumulative_recovered: int64
cumulative_tested: int64
Find all records in July 2022 and find the average new_confirmed
cases per location_key
:
import pyarrow as pa
import pyarrow.compute as pc
from datetime import date
= ds.filter(
july_table 'date') >= pa.scalar(date(2022, 7, 1), type=pa.date32())) &
((pc.field('date') < pa.scalar(date(2022, 8, 1), type=pa.date32()))
(pc.field(
)
).to_table()
= pa.TableGroupBy(july_table, 'location_key').aggregate([('new_confirmed', 'mean')])
result result
pyarrow.Table
location_key: string
new_confirmed_mean: double
----
location_key: [["BR_MG_314875","BR_MG_314880","BR_MG_314890","BR_MG_314900","BR_MG_314910",...,"IL_WE_3779","IL_WE_3780","IL_WE_3788","IN_UT_BGS","BR_MT_510517"]]
new_confirmed_mean: [[0.4838709677419355,2.064516129032258,3.5483870967741935,0,1.7741935483870968,...,1.2258064516129032,3.903225806451613,0.6774193548387096,0.41935483870967744,12.290322580645162]]
PyArrow has no integrated way to view data so we have use result.to_pandas()
or convert the table into a polars df:
10) pl.DataFrame(result).head(
location_key | new_confirmed_mean |
---|---|
str | f64 |
"BR_MG_314875" | 0.483871 |
"BR_MG_314880" | 2.064516 |
"BR_MG_314890" | 3.548387 |
"BR_MG_314900" | 0.0 |
"BR_MG_314910" | 1.774194 |
"BR_MG_314915" | 0.0 |
"BR_MG_314920" | 0.387097 |
"BR_MG_314930" | 0.0 |
"BR_MG_314940" | 0.0 |
"BR_MG_314950" | 0.0 |
DeltaLake Maintenance
As I mentioned earlier, delta tables will become polluted over time. This can be managed with the following:
Compacting the table is useful for turning a table made up of many small files into fewer larger ones.
dtable.optimize.compact()
{'numFilesAdded': 1,
'numFilesRemoved': 20,
'filesAdded': '{"avg":58140236.0,"max":58140236,"min":58140236,"totalFiles":1,"totalSize":58140236}',
'filesRemoved': '{"avg":3854084.2,"max":4648234,"min":3193592,"totalFiles":20,"totalSize":77081684}',
'partitionsOptimized': 1,
'numBatches': 12244,
'totalConsideredFiles': 20,
'totalFilesSkipped': 0,
'preserveInsertionOrder': True}
The vacuum command is a garbage collector which cleans files which have been marked for deletion. More on this here.
# Does nothing in this instance because we haven't deleted any rows. dtable.vacuum()
[]
Conclusion
Delta Lake is a powerful tool for managing massive amounts of data efficiently and cost-effectively. It provides robust features like ACID transactions, scalable metadata handling, and efficient data storage formats, ensuring data integrity and reliability. By leveraging Delta Lake, organizations can perform complex data operations without compromizing on cost or performance.