4.01 · Deep Dive: ETL Patterns & Idempotency

Level: Advanced Time to read: 20 min Pre-reading: 04 · ETL & ELT After reading: You'll understand idempotency, exactly-once semantics, change capture patterns, and late-arriving data handling.


The Core Challenge: Building Reliable Data Pipelines

Data pipelines run continuously and face failures at any point. The challenge:

Problem: Pipeline fails halfway through
├─ Extract completed
├─ Transform completed
└─ Load fails (network timeout)

Question: When retry happens, what do we do?
├─ Option A: Re-extract and re-transform (expensive, slow)
├─ Option B: Only load (data duplication)
└─ Option C: Detect what happened and continue (idempotent)

Solution: Design idempotent pipelines.


Pattern 1: Idempotent Extraction

Definition: Running extraction multiple times produces the same result.

Anti-Pattern: Non-Idempotent Extraction

# ❌ BAD: Each run appends to the table
def extract_and_load():
    customers = query_source_system()  # Gets same 1000 rows
    warehouse.append(customers)  # Appends 1000 rows

# Run 1: 1000 rows loaded
# Run 1 (retry): 1000 rows appended → 2000 rows total (WRONG!)

Pattern: Delete-Then-Insert

# ✅ GOOD: Idempotent approach
def extract_and_load(run_date):
    # Step 1: Delete old data for this date
    warehouse.delete(
        table='bronze_customers',
        where=f"_elt_date = '{run_date}'"
    )

    # Step 2: Extract fresh data
    customers = query_source_system()

    # Step 3: Insert
    warehouse.append(customers)

    # Result: Always same 1000 rows for this date, even if retried

Pattern: Staging Table

-- Load into temp table first
CREATE TEMPORARY TABLE staging_customers AS
SELECT * FROM source_system.customers;

-- Delete old staging data
DELETE FROM bronze_customers_staging
WHERE _elt_date = CURRENT_DATE();

-- Rename staging to main (atomic operation)
BEGIN TRANSACTION;
  INSERT INTO bronze_customers
  SELECT * FROM staging_customers;

  -- Mark as complete only after successful insert
  INSERT INTO _pipeline_tracking (run_id, status, timestamp)
  VALUES (current_run_id, 'COMPLETED', CURRENT_TIMESTAMP());
COMMIT;

Pattern 2: Idempotent Transformation

Definition: Running transformation multiple times on the same input produces the same output.

Anti-Pattern: Running Totals

-- ❌ BAD: Not idempotent (depends on previous state)
UPDATE sales_summary
SET total_revenue = total_revenue + today_revenue
WHERE date = CURRENT_DATE();

-- Run 1: total_revenue = 0 + 1000 = 1000 ✓
-- Run 2 (retry): total_revenue = 1000 + 1000 = 2000 ✗ (WRONG!)

Pattern: Recompute from Scratch

-- ✅ GOOD: Idempotent (always produces same result)
DELETE FROM gold_sales_summary
WHERE summary_date = CURRENT_DATE();

INSERT INTO gold_sales_summary (summary_date, total_revenue, ...)
SELECT
  DATE(sale_date) as summary_date,
  SUM(amount) as total_revenue,
  COUNT(*) as num_orders,
  ...
FROM silver_sales
WHERE DATE(sale_date) = CURRENT_DATE()
GROUP BY DATE(sale_date);

-- Run 1: Computes total_revenue = 1000 ✓
-- Run 2 (retry): Deletes and recomputes = 1000 ✓ (IDEMPOTENT)

Pattern: Deduplication by Run ID

-- Track which run inserted each row
CREATE TABLE gold_orders_final (
  order_id INT,
  amount DECIMAL,
  _run_id STRING,
  _inserted_at TIMESTAMP
);

-- Insert with run tracking
INSERT INTO gold_orders_final
SELECT
  o.order_id,
  o.amount,
  '2024-01-15_run_001' as _run_id,
  CURRENT_TIMESTAMP() as _inserted_at
FROM silver_orders o;

-- Before retry, delete previous run's output
DELETE FROM gold_orders_final
WHERE _run_id = '2024-01-15_run_001';

-- Then retry insert (now idempotent!)

Pattern 3: Change Data Capture (CDC)

Problem: Extracting full tables is inefficient. How to detect what changed?

Approach 1: Timestamp-Based CDC

-- Source system tracks last update time
CREATE TABLE source.customers (
  customer_id INT,
  email STRING,
  updated_at TIMESTAMP  -- Key column
);

-- Extraction query
WITH previous_run AS (
  SELECT MAX(_elt_timestamp) as last_run
  FROM bronze.customers_watermark
),
new_changes AS (
  SELECT
    s.*,
    CURRENT_TIMESTAMP() as _elt_timestamp
  FROM source.customers s
  CROSS JOIN previous_run pr
  WHERE s.updated_at > pr.last_run  -- Only new/changed rows
)
INSERT INTO bronze.customers
SELECT * FROM new_changes;

-- Update watermark
DELETE FROM bronze.customers_watermark;
INSERT INTO bronze.customers_watermark
SELECT MAX(_elt_timestamp) FROM bronze.customers;

Approach 2: Log-Based CDC (Database Replication Logs)

Source System          →  Replication Log  →  CDC Tool    →  Warehouse
┌──────────────────┐      ┌──────────────┐   ┌─────────┐   ┌──────────┐
│ Before: cust_1   │  →   │ DELETE cust_1│→  │ Debezium│→  │ Capture  │
│ After: cust_2    │      │ INSERT cust_2│   │ (CDC)   │   │ Changes  │
└──────────────────┘      └──────────────┘   └─────────┘   └──────────┘

Tools: Debezium, AWS DMS, Oracle GoldenGate, MySQL binlog

Approach 3: Query-Based Comparison

-- Compare source to warehouse (slow but works)
WITH source_data AS (
  SELECT customer_id, email, phone, CURRENT_TIMESTAMP() as extracted_at
  FROM source.customers
),
warehouse_data AS (
  SELECT customer_id, email, phone
  FROM bronze.customers
  WHERE _elt_timestamp = (SELECT MAX(_elt_timestamp) FROM bronze.customers)
),
changes AS (
  SELECT
    COALESCE(s.customer_id, w.customer_id) as customer_id,
    s.customer_id IS NOT NULL as is_in_source,
    w.customer_id IS NOT NULL as is_in_warehouse,
    s.email, w.email as old_email,
    s.phone, w.phone as old_phone
  FROM source_data s
  FULL OUTER JOIN warehouse_data w
    ON s.customer_id = w.customer_id
  WHERE
    s.customer_id IS NULL OR  -- Deleted in source
    w.customer_id IS NULL OR  -- New in source
    s.email != w.email OR     -- Changed
    s.phone != w.phone        -- Changed
)
INSERT INTO bronze.customers
SELECT
  customer_id,
  email,
  phone,
  CURRENT_TIMESTAMP() as _elt_timestamp
FROM changes
WHERE is_in_source = TRUE;  -- Only insert/update rows that exist in source

Pattern 4: Late-Arriving Data

Problem: Data arrives out of order or delayed.

Timeline:
──────────────────────────────────────────────
Date:    Jan 1  Jan 2  Jan 3  Jan 4
├─ Jan 1 data arrived: Jan 1
├─ Jan 2 data arrived: Jan 2  
├─ Jan 3 data arrived: Jan 5  ← Late (2 days)
└─ Jan 4 data arrived: Jan 3  ← Out of order!

Solution: Deferred Aggregation

-- Instead of computing totals immediately, defer them
CREATE TABLE bronze.raw_transactions (
  transaction_id INT,
  transaction_date DATE,
  amount DECIMAL,
  _received_date DATE,
  _load_id STRING
);

-- Insert all data (even late)
INSERT INTO bronze.raw_transactions
SELECT
  transaction_id,
  transaction_date,
  amount,
  CURRENT_DATE() as _received_date,
  'run_2024_01_04' as _load_id
FROM source.transactions;

-- Aggregate only data past SLA window (e.g., > 3 days old)
CREATE OR REPLACE TABLE gold.daily_summary AS
SELECT
  transaction_date,
  SUM(amount) as daily_total,
  COUNT(*) as num_transactions,
  MAX(_received_date) as latest_received_date,
  CURRENT_TIMESTAMP() as _computed_at
FROM bronze.raw_transactions
WHERE transaction_date < (CURRENT_DATE() - INTERVAL 3 DAY)
  AND transaction_date >= '2024-01-01'
GROUP BY transaction_date;

-- For recent data, show "preliminary" (still receiving late data)
CREATE OR REPLACE TABLE gold.daily_summary_preliminary AS
SELECT
  transaction_date,
  SUM(amount) as daily_total,
  'PRELIMINARY' as data_status
FROM bronze.raw_transactions
WHERE transaction_date >= (CURRENT_DATE() - INTERVAL 3 DAY)
GROUP BY transaction_date;

Pattern 5: Exactly-Once Semantics

Goal: Ensure each record is processed exactly once, not zero times and not twice.

Approach 1: Idempotent Keys

-- Add a natural idempotent key (business key + timestamp)
CREATE TABLE gold.fact_sales (
  idempotent_key STRING,  -- Unique per load
  order_id INT,
  amount DECIMAL,
  processed_at TIMESTAMP
);

-- Query to generate idempotent key
SELECT
  CONCAT(order_id, '_', DATE(order_date), '_', source_system) as idempotent_key,
  order_id,
  amount
FROM silver.orders;

-- Use ON CONFLICT to ensure exactly-once
INSERT INTO gold.fact_sales
VALUES (idempotent_key, order_id, amount, CURRENT_TIMESTAMP())
ON CONFLICT (idempotent_key) DO NOTHING;

-- Result: Retries don't duplicate (idempotent key exists, so skipped)

Approach 2: Transactional Semantics

# Use transactions to ensure all-or-nothing
def load_with_exactly_once_semantics():
    try:
        warehouse.begin_transaction()

        # Step 1: Insert fact data
        warehouse.insert_into('gold_sales', transformed_data)

        # Step 2: Update checkpoint (both in same transaction)
        warehouse.update_checkpoint(
            source_id='ecommerce_db',
            last_processed_id=max_id,
            timestamp=now()
        )

        warehouse.commit()  # Either both succeed or both rollback
    except Exception as e:
        warehouse.rollback()
        raise

Pattern 6: Handling Duplicates & Conflicts

Merge Pattern (UPSERT)

-- Update existing, insert new (idempotent)
MERGE INTO gold.dim_customer AS target
USING (
  SELECT customer_id, email, phone, updated_at
  FROM silver.customers
  WHERE _load_date = CURRENT_DATE()
) AS source
ON target.customer_id = source.customer_id
  AND target.is_current = TRUE

WHEN MATCHED AND source.updated_at > target.updated_at THEN
  UPDATE SET
    email = source.email,
    phone = source.phone,
    updated_at = source.updated_at,
    is_current = TRUE

WHEN NOT MATCHED THEN
  INSERT (customer_id, email, phone, updated_at, is_current)
  VALUES (source.customer_id, source.email, source.phone, source.updated_at, TRUE);

ETL Anti-Patterns to Avoid

Anti-Pattern Problem Solution
Non-idempotent ops Retries cause duplicates Design delete-then-insert or deduplication
Shared state Order-dependent runs Use explicit watermarks or timestamps
No error handling Partial loads Transactions + checkpoints
No monitoring Failures undetected Add quality gates and alerting
Full reprocessing Slow pipelines Implement CDC or incremental patterns

Monitoring & Alerting

Pipeline Health Checks

-- Alert if pipeline is late
SELECT
  source_system,
  MAX(_elt_timestamp) as last_load,
  CURRENT_TIMESTAMP() as current_time,
  TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), MAX(_elt_timestamp), HOUR) as hours_since_load
FROM bronze.data_sources_log
GROUP BY source_system
HAVING hours_since_load > 2;  -- Alert if > 2 hours late

-- Alert on quality degradation
SELECT
  table_name,
  COUNT(*) as total_rows,
  COUNTIF(_is_valid = FALSE) as invalid_rows,
  SAFE_DIVIDE(COUNTIF(_is_valid = FALSE), COUNT(*)) as error_rate
FROM bronze.quality_tracking
WHERE _load_date = CURRENT_DATE()
GROUP BY table_name
HAVING error_rate > 0.01;  -- Alert if > 1% invalid

Key Takeaways

  1. Idempotency is the foundation of reliable pipelines
  2. Exactly-once semantics requires idempotent keys or transactions
  3. CDC patterns reduce extraction overhead
  4. Late-arriving data requires deferred aggregation
  5. Monitoring ensures pipelines stay healthy