The Setup
The previous article covered what a data lakehouse is conceptually. This one is the opposite: pure hands-on. By the end, you will have a working lakehouse running on your laptop, with real ACID transactions, schema evolution, time travel, and table maintenance.
To keep things accessible, we are going to use the lightest possible stack. No Spark cluster, no cloud accounts, no Kubernetes. Just Python and a few libraries.
The Scenario: Sonik, a Music Streaming Company
To make this real, let us pretend we work for Sonik, a music streaming startup. Every time a user plays a song, the app fires an event. We want to store these play events in a lakehouse so we can:
Run analytics queries (top tracks, top artists, listening time per country).
Train recommendation models on historical data.
Show users their personal listening stats.
Audit what the data looked like a week ago when something went wrong.
Each play event has fields like user, track, artist, timestamp, country, and device. Hundreds of millions of these per day at scale. We need a lakehouse that can absorb this volume and make it queryable.
Step 1: Install Dependencies
Create a fresh Python virtual environment and install what we need:
python3 -m venv .venv
source .venv/bin/activate
pip install "pyiceberg[sql-sqlite,pyarrow,duckdb]" pandas pyarrow duckdb
That single install gives us:
PyIceberg: the Python implementation of the Iceberg table format.
SQLite catalog: stores table metadata locally without needing a server.
PyArrow: the in-memory columnar format we use to push data into Iceberg.
DuckDB: a fast embedded SQL engine that can query Iceberg tables.
Pandas: for convenient data inspection.
Now create a project folder:
mkdir sonik-lakehouse
cd sonik-lakehouse
mkdir -p warehouse
The warehouse directory is where Iceberg will store our actual data files. Think of it as a stand-in for an S3 bucket.
Step 2: Create the Catalog
The catalog is the directory that knows about your tables. We will use a SQLite-backed catalog because it requires zero setup. Save this as setup_catalog.py:
from pyiceberg.catalog.sql import SqlCatalog
catalog = SqlCatalog(
"sonik",
**{
"uri": "sqlite:///./catalog.db",
"warehouse": "file://./warehouse",
},
)
# Create a namespace (like a database/schema)
catalog.create_namespace_if_not_exists("music")
print("Catalog initialized")
print("Namespaces:", catalog.list_namespaces())
Run it:
python setup_catalog.py
You should see two new files appear:
sonik-lakehouse/
├── catalog.db # SQLite database holding catalog metadata
├── warehouse/ # Where data files will go
└── setup_catalog.py
The catalog.db is your "single source of truth." Every table you create gets registered here. In production, this would be a hosted Postgres, AWS Glue, or Polaris. The interface is the same.
Step 3: Define the Schema and Create the Table
Iceberg requires you to declare a schema upfront. This is one of the things that distinguishes a lakehouse from a plain data lake: structure is enforced. Save this as create_table.py:
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
NestedField,
StringType,
LongType,
IntegerType,
TimestampType,
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
catalog = SqlCatalog(
"sonik",
**{
"uri": "sqlite:///./catalog.db",
"warehouse": "file://./warehouse",
},
)
# Define the schema for play events
schema = Schema(
NestedField(1, "play_id", StringType(), required=True),
NestedField(2, "user_id", LongType(), required=True),
NestedField(3, "track_id", LongType(), required=True),
NestedField(4, "artist", StringType(), required=True),
NestedField(5, "title", StringType(), required=True),
NestedField(6, "played_at", TimestampType(), required=True),
NestedField(7, "duration_seconds", IntegerType()),
NestedField(8, "country", StringType()),
NestedField(9, "device", StringType()),
)
# Partition by day so queries that filter on date are fast
partition_spec = PartitionSpec(
PartitionField(
source_id=6, # played_at column
field_id=1000,
transform=DayTransform(),
name="played_day",
)
)
table = catalog.create_table(
identifier="music.plays",
schema=schema,
partition_spec=partition_spec,
)
print(f"Created table: {table.identifier}")
print(f"Schema: {table.schema()}")
Run it:
python create_table.py
A few things just happened. Let me unpack them.
The schema defines columns and their types. The required=True flag means the column is NOT NULL. Each field gets a numeric ID that Iceberg uses to track columns even if you rename them later.
The partition spec tells Iceberg how to physically organize files. We are partitioning by played_day, derived from the played_at timestamp. This means all plays from a single day will live in the same directory. Queries that filter by date will only scan that directory, not the whole table. This is huge for performance.
Hidden partitioning is one of Iceberg's killer features. Users do not need to know that we partition by day. They just write WHERE played_at > '2026-04-25' and Iceberg figures out which partitions to read. In older systems like Hive, the user had to manually add partition columns to every query.
Step 4: Generate and Insert Sample Data
Time to put data into the table. Save this as ingest.py:
import random
import uuid
from datetime import datetime, timedelta
import pyarrow as pa
from pyiceberg.catalog.sql import SqlCatalog
catalog = SqlCatalog(
"sonik",
**{
"uri": "sqlite:///./catalog.db",
"warehouse": "file://./warehouse",
},
)
# Some sample data to make it feel real
ARTISTS = [
("The Weeknd", ["Blinding Lights", "Save Your Tears", "Starboy"]),
("Taylor Swift", ["Anti-Hero", "Cruel Summer", "Lavender Haze"]),
("Drake", ["God's Plan", "Hotline Bling", "One Dance"]),
("Bad Bunny", ["Tití Me Preguntó", "Me Porto Bonito"]),
("Billie Eilish", ["Bad Guy", "Lovely", "Happier Than Ever"]),
]
COUNTRIES = ["US", "UK", "DE", "BR", "JP", "IR", "CA", "AU"]
DEVICES = ["mobile", "desktop", "tablet", "smart_speaker", "tv"]
def generate_plays(count: int, day_offset: int = 0):
"""Generate a list of fake play events."""
base_date = datetime.now() - timedelta(days=day_offset)
rows = []
for _ in range(count):
artist, tracks = random.choice(ARTISTS)
rows.append({
"play_id": str(uuid.uuid4()),
"user_id": random.randint(1, 100_000),
"track_id": random.randint(1, 5000),
"artist": artist,
"title": random.choice(tracks),
"played_at": base_date - timedelta(
seconds=random.randint(0, 86400)
),
"duration_seconds": random.randint(30, 240),
"country": random.choice(COUNTRIES),
"device": random.choice(DEVICES),
})
return rows
# Load the table
table = catalog.load_table("music.plays")
# Generate data for the last 7 days, 50,000 events per day
for day in range(7):
print(f"Generating data for day -{day}")
rows = generate_plays(count=50_000, day_offset=day)
arrow_table = pa.Table.from_pylist(rows, schema=table.schema().as_arrow())
table.append(arrow_table)
print(f"Total rows: {table.scan().to_arrow().num_rows}")
Run it:
python ingest.py
This generates 350,000 fake play events across 7 days and writes them to the table. Each call to table.append() is an atomic transaction: either all rows from that batch land in the table, or none do. There is no half-written state.
Now look at your warehouse directory. It should look something like this:
warehouse/music.db/plays/
├── data/
│ ├── played_day=2026-04-21/
│ │ └── 00000-0-xxxxx.parquet
│ ├── played_day=2026-04-22/
│ │ └── 00000-0-yyyyy.parquet
│ └── ...
└── metadata/
├── 00000-xxxxx.metadata.json
├── 00001-yyyyy.metadata.json
├── snap-1234567-1-xxxxx.avro
└── ...
The data/ folder holds the actual rows in Parquet format, organized by day. The metadata/ folder is where Iceberg works its magic. It contains:
Metadata JSON files: describe the table's current state, including its schema, partition spec, and pointer to the current snapshot.
Manifest list files (Avro): describe each snapshot of the table.
Manifest files (Avro): list which data files belong to each snapshot.
This metadata layer is what gives Iceberg its powers. By manipulating these files, Iceberg can give you ACID transactions, time travel, and schema evolution without ever touching the data files themselves.
Step 5: Query the Data
You have two ways to query an Iceberg table from Python: directly with PyIceberg, or through DuckDB.
Method 1: PyIceberg Native
from pyiceberg.catalog.sql import SqlCatalog
catalog = SqlCatalog(
"sonik",
**{
"uri": "sqlite:///./catalog.db",
"warehouse": "file://./warehouse",
},
)
table = catalog.load_table("music.plays")
# Top artists by play count
df = table.scan(
selected_fields=["artist"],
).to_pandas()
print(df["artist"].value_counts().head(10))
Useful for exploration, but not great for complex aggregations. The PyIceberg API is mostly about reading and filtering data, not running SQL.
Method 2: DuckDB SQL
This is where things get fun. DuckDB has native Iceberg support, so we can write real SQL against our table:
import duckdb
from pyiceberg.catalog.sql import SqlCatalog
catalog = SqlCatalog(
"sonik",
**{
"uri": "sqlite:///./catalog.db",
"warehouse": "file://./warehouse",
},
)
table = catalog.load_table("music.plays")
# Get the path to the current metadata file
metadata_path = table.metadata_location
con = duckdb.connect()
con.execute("INSTALL iceberg; LOAD iceberg;")
# Top 10 artists by play count over the last 3 days
result = con.execute(f"""
SELECT artist, COUNT(*) AS plays
FROM iceberg_scan('{metadata_path}')
WHERE played_at > current_timestamp - INTERVAL 3 DAY
GROUP BY artist
ORDER BY plays DESC
LIMIT 10
""").fetchdf()
print(result)
This is real SQL on real data, with no warehouse provisioning. DuckDB reads the Iceberg metadata, figures out which Parquet files it needs to scan based on the WHERE clause, and processes them in parallel.
You can write any SQL you want. Joins, window functions, aggregations. DuckDB handles it.
Step 6: Schema Evolution
Now imagine the product team wants to know what subscription tier each listener is on. Free? Premium? Family? They ask you to add a subscription_tier column to the plays table.
In a traditional system, this is a nightmare. You have to alter the table, possibly rewrite all the data, possibly take downtime. In Iceberg, it is one line:
from pyiceberg.types import StringType
from pyiceberg.catalog.sql import SqlCatalog
catalog = SqlCatalog(
"sonik",
**{
"uri": "sqlite:///./catalog.db",
"warehouse": "file://./warehouse",
},
)
table = catalog.load_table("music.plays")
# Add the new column
with table.update_schema() as update:
update.add_column(
path="subscription_tier",
field_type=StringType(),
doc="Free, Premium, or Family",
)
print("Schema updated:")
print(table.schema())
That is it. Existing rows get NULL for the new column. New writes can include it. No data was rewritten. The change happened entirely in the metadata layer.
You can also rename columns, drop columns, change types (in compatible directions), reorder columns, and even update partition specs. All without rewriting data files.
Step 7: Time Travel
Every append() we did earlier created a new snapshot. Iceberg keeps the history of every table change. You can query the table as it looked at any past point in time.
First, let us see the snapshot history:
table = catalog.load_table("music.plays")
for snapshot in table.snapshots():
print(f"Snapshot {snapshot.snapshot_id}")
print(f" Timestamp: {snapshot.timestamp_ms}")
print(f" Operation: {snapshot.summary.get('operation')}")
print(f" Records: {snapshot.summary.get('total-records')}")
print()
Each append shows up as a snapshot, with how many records existed at that point. Now let us read the table as it was after the second snapshot, before we added the rest of the days:
snapshots = list(table.snapshots())
second_snapshot_id = snapshots[1].snapshot_id
# Read the table at that point in time
old_data = table.scan(snapshot_id=second_snapshot_id).to_pandas()
print(f"Rows at snapshot {second_snapshot_id}: {len(old_data)}")
This is unbelievably useful in practice. Common scenarios:
Bad data ingest: someone pushed corrupt rows. Roll back to yesterday's snapshot until the upstream is fixed.
Auditing: "What did the user table look like on the 1st of the month?" Just query that snapshot.
Reproducible ML training: always train on a specific snapshot ID so results are reproducible.
Debugging: "Did this row exist last week?" Just check.
Step 8: Maintenance (The Boring But Important Part)
Every time we appended a batch, Iceberg created new files. After many appends, you end up with hundreds of small files per partition. This hurts query performance because the engine has to open every file.
The fix is compaction: merge small files into bigger ones. PyIceberg has built-in support:
from pyiceberg.catalog.sql import SqlCatalog
catalog = SqlCatalog(
"sonik",
**{
"uri": "sqlite:///./catalog.db",
"warehouse": "file://./warehouse",
},
)
table = catalog.load_table("music.plays")
# Compact small files in all partitions
table.optimize().rewrite_data_files()
print("Compaction complete")
This rewrites your small Parquet files into a smaller number of larger ones. The data is identical, but queries are now much faster. Run this on a schedule (e.g., daily after the day's batch is done).
Snapshot Expiration
Snapshots are great for time travel, but they accumulate. Each one keeps references to data files that the current state no longer needs. Over time, this wastes storage. You should expire old snapshots periodically:
from datetime import datetime, timedelta
# Keep snapshots from the last 30 days, drop older ones
cutoff_ms = int((datetime.now() - timedelta(days=30)).timestamp() * 1000)
table.expire_snapshots().expire_older_than(cutoff_ms).commit()
print("Old snapshots expired")
Adjust the retention based on how far back you actually need to time-travel. For most teams, 30 to 90 days is reasonable.
What Is Actually Happening Behind the Scenes
Let me explain what Iceberg is doing every time you read or write the table. This is the part that helps you reason about edge cases later.
This multi-level metadata is why Iceberg can scale to billions of rows without choking. Most queries only ever touch a tiny fraction of the actual data files because the stats let it skip the rest.
Going to Production
Everything above runs on your laptop. To take the same code to production, you swap out three components and the rest stays the same.
warehouse URI from file://./warehouse to s3://your-bucket/warehouseGlueCatalog or RestCatalog instead of SqlCatalogThe same table can be written by Spark, queried by Trino, and used by a DuckDB-powered notebook. This is the real promise of the lakehouse: one copy of your data, every engine, every workload.
What You Just Built (And What It Means)
Walking through this on your laptop, you went from zero to a working lakehouse with:
A catalog that maps table names to physical storage.
A schema-enforced table with partitions for fast filtering.
Atomic writes (append). No partial data.
SQL queries through DuckDB.
Schema evolution without rewriting data.
Time travel through snapshot history.
Maintenance jobs (compaction, snapshot expiration).
This is genuinely production-grade architecture, just running on a laptop. The exact same patterns scale to petabytes when you swap the local filesystem for S3 and PyIceberg for Spark.
The biggest lesson is this: the lakehouse pattern is not magic, and it is not vendor-locked. Iceberg is an open spec. The metadata files in your warehouse directory are just JSON and Avro. Anyone can build a tool that reads them. That openness is why this architecture is winning.
Open this folder in 10 years and the data will still be readable. That is something you cannot say about most database systems.