ETL (Extract, Transform, Load) pipelines move data from sources into a structured store for analysis. This project builds a reusable pipeline that ingests sales data and produces daily summaries.

What You’ll Build

  extract/     Read CSV files and fetch API data
transform/   Clean, validate, aggregate with Pandas
load/        Write to SQLite database
cli.py       Run pipeline on demand or by date range
  

Output tables:

  • raw_sales — cleaned individual records
  • daily_summary — revenue and order count per day per region

Prerequisites

Setup

  mkdir etl-pipeline && cd etl-pipeline
python -m venv .venv && source .venv/bin/activate
pip install pandas sqlalchemy requests click python-dotenv
  

Project Structure

  etl-pipeline/
├── data/
│   └── sample_sales.csv      # input data
├── etl/
│   ├── __init__.py
│   ├── extract.py
│   ├── transform.py
│   ├── load.py
│   └── pipeline.py
├── cli.py
├── tests/
│   └── test_transform.py
└── .env
  

Sample Input Data

  # data/sample_sales.csv
order_id,date,region,product,quantity,unit_price
1001,2026-06-01,North,Widget,2,19.99
1002,2026-06-01,South,Gadget,1,49.99
1003,2026-06-02,North,Widget,5,19.99
1004,2026-06-02,East,Gadget,3,49.99
1005,2026-06-03,North,Widget,1,19.99
1006,2026-06-03,South,Widget,10,19.99
  

Extract

  # etl/extract.py
import pandas as pd
import requests
from pathlib import Path

def extract_csv(path: str | Path) -> pd.DataFrame:
    df = pd.read_csv(path, parse_dates=["date"])
    return df

def extract_api(url: str, params: dict | None = None) -> pd.DataFrame:
    """Fetch JSON array from an API and return as DataFrame."""
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()
    return pd.DataFrame(response.json())
  

For local development, use the CSV. In production, replace or supplement with API extraction.

Transform

  # etl/transform.py
import pandas as pd

REQUIRED_COLUMNS = {"order_id", "date", "region", "product", "quantity", "unit_price"}

def validate(df: pd.DataFrame) -> pd.DataFrame:
    missing = REQUIRED_COLUMNS - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns: {missing}")

    df = df.dropna(subset=["order_id", "date", "quantity", "unit_price"])
    df = df[df["quantity"] > 0]
    df = df[df["unit_price"] >= 0]
    df["order_id"] = df["order_id"].astype(int)
    df["region"] = df["region"].str.strip().str.title()
    df["product"] = df["product"].str.strip()
    return df

def enrich(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["revenue"] = df["quantity"] * df["unit_price"]
    df["date"] = pd.to_datetime(df["date"]).dt.date
    return df

def aggregate_daily(df: pd.DataFrame) -> pd.DataFrame:
    summary = (
        df.groupby(["date", "region"], as_index=False)
        .agg(
            order_count=("order_id", "count"),
            total_quantity=("quantity", "sum"),
            total_revenue=("revenue", "sum"),
        )
        .sort_values(["date", "region"])
    )
    return summary

def transform(raw: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    cleaned = enrich(validate(raw))
    summary = aggregate_daily(cleaned)
    return cleaned, summary
  

Load

  # etl/load.py
import pandas as pd
from sqlalchemy import create_engine, text

def get_engine(db_url: str = "sqlite:///data/warehouse.db"):
    return create_engine(db_url)

def load(df: pd.DataFrame, table: str, engine, if_exists: str = "replace"):
    df.to_sql(table, engine, if_exists=if_exists, index=False)

def load_incremental(df: pd.DataFrame, table: str, engine, key_column: str = "order_id"):
    """Append only records not already in the table."""
    with engine.connect() as conn:
        existing = pd.read_sql(f"SELECT {key_column} FROM {table}", conn)
    if existing.empty:
        df.to_sql(table, engine, if_exists="replace", index=False)
    else:
        new_rows = df[~df[key_column].isin(existing[key_column])]
        if not new_rows.empty:
            new_rows.to_sql(table, engine, if_exists="append", index=False)
    return len(df)
  

Pipeline Orchestration

  # etl/pipeline.py
import logging
from pathlib import Path
from etl.extract import extract_csv
from etl.transform import transform
from etl.load import get_engine, load

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)

def run_pipeline(csv_path: str, db_url: str = "sqlite:///data/warehouse.db"):
    logger.info("Starting ETL pipeline")

    # Extract
    logger.info("Extracting from %s", csv_path)
    raw = extract_csv(csv_path)
    logger.info("Extracted %d rows", len(raw))

    # Transform
    logger.info("Transforming data")
    sales, summary = transform(raw)
    logger.info("Cleaned %d rows, %d summary rows", len(sales), len(summary))

    # Load
    engine = get_engine(db_url)
    Path("data").mkdir(exist_ok=True)
    load(sales, "raw_sales", engine)
    load(summary, "daily_summary", engine)
    logger.info("Loaded to %s", db_url)

    return {"raw_rows": len(sales), "summary_rows": len(summary)}
  

CLI Entry Point

  # cli.py
import click
from etl.pipeline import run_pipeline

@click.command()
@click.option("--input", "-i", default="data/sample_sales.csv", help="CSV input path")
@click.option("--db", default="sqlite:///data/warehouse.db", help="Database URL")
def main(input: str, db: str):
    """Run the sales ETL pipeline."""
    result = run_pipeline(input, db)
    click.echo(f"Done: {result['raw_rows']} sales rows, {result['summary_rows']} summary rows")

if __name__ == "__main__":
    main()
  

Run:

  python cli.py
python cli.py --input data/june_sales.csv --db sqlite:///data/warehouse.db
  

Query Results

  import pandas as pd
from sqlalchemy import create_engine

engine = create_engine("sqlite:///data/warehouse.db")

sales = pd.read_sql("SELECT * FROM raw_sales LIMIT 5", engine)
print(sales)

summary = pd.read_sql("""
    SELECT date, region, order_count, total_revenue
    FROM daily_summary
    ORDER BY total_revenue DESC
""", engine)
print(summary)
  

Expected summary output:

           date region  order_count  total_revenue
0  2026-06-03  South            1         199.90
1  2026-06-02   East            1         149.97
...
  

Tests

  # tests/test_transform.py
import pandas as pd
import pytest
from etl.transform import validate, enrich, aggregate_daily, transform

@pytest.fixture
def sample_df():
    return pd.DataFrame({
        "order_id": [1, 2, 3],
        "date": ["2026-06-01", "2026-06-01", "2026-06-02"],
        "region": ["north", " south ", "East"],
        "product": ["Widget", "Gadget", "Widget"],
        "quantity": [2, 0, 5],          # row 2 should be filtered
        "unit_price": [10.0, 20.0, 10.0],
    })

def test_validate_filters_bad_rows(sample_df):
    result = validate(sample_df)
    assert len(result) == 2  # quantity=0 row removed

def test_enrich_adds_revenue(sample_df):
    cleaned = validate(sample_df)
    enriched = enrich(cleaned)
    assert "revenue" in enriched.columns
    assert enriched.iloc[0]["revenue"] == 20.0

def test_aggregate_daily(sample_df):
    cleaned = enrich(validate(sample_df))
    summary = aggregate_daily(cleaned)
    assert "total_revenue" in summary.columns
    assert summary["order_count"].sum() == 2
  

Run: pytest tests/ -v

Pipeline Architecture

  CSV / API
    │
    ▼
 Extract ──► raw DataFrame
    │
    ▼
 Transform ──► validate → enrich → aggregate
    │
    ├──► raw_sales (detail table)
    └──► daily_summary (aggregated table)
    │
    ▼
  SQLite / PostgreSQL
  

Scheduling in Production

Run daily via cron:

  # crontab -e
0 2 * * * cd /app/etl-pipeline && .venv/bin/python cli.py >> /var/log/etl.log 2>&1
  

Or trigger from CI/CD / Airflow / cloud scheduler after new files land in S3.

Switch to PostgreSQL for production:

  python cli.py --db postgresql://user:pass@localhost/warehouse
  

Concepts Applied

Bonus Challenges

  1. Idempotent runs — use load_incremental to skip duplicate order_ids
  2. Data quality report — log counts of dropped rows and reasons
  3. Parquet output — write intermediate files to data/staging/ for audit
  4. API extract — pull live data from a public REST API and merge with CSV
  5. Great Expectations — add formal data validation rules
  6. Docker + cron — containerize and schedule with DevOps
  7. dbt integration — move SQL transformations to dbt models

ETL pipelines are the backbone of data engineering — this project gives you the core Extract → Transform → Load pattern used everywhere from startups to enterprise data warehouses.