4.01 · Deep Dive: ETL Patterns & Idempotency
Level: Advanced Time to read: 20 min Pre-reading: 04 · ETL & ELT After reading: You'll understand idempotency, exactly-once semantics, change capture patterns, and late-arriving data handling.
The Core Challenge: Building Reliable Data Pipelines
Data pipelines run continuously and face failures at any point. The challenge:
Problem: Pipeline fails halfway through
├─ Extract completed
├─ Transform completed
└─ Load fails (network timeout)
Question: When retry happens, what do we do?
├─ Option A: Re-extract and re-transform (expensive, slow)
├─ Option B: Only load (data duplication)
└─ Option C: Detect what happened and continue (idempotent)
Solution: Design idempotent pipelines.
Pattern 1: Idempotent Extraction
Definition: Running extraction multiple times produces the same result.
Anti-Pattern: Non-Idempotent Extraction
# ❌ BAD: Each run appends to the table
def extract_and_load():
customers = query_source_system() # Gets same 1000 rows
warehouse.append(customers) # Appends 1000 rows
# Run 1: 1000 rows loaded
# Run 1 (retry): 1000 rows appended → 2000 rows total (WRONG!)
Pattern: Delete-Then-Insert
# ✅ GOOD: Idempotent approach
def extract_and_load(run_date):
# Step 1: Delete old data for this date
warehouse.delete(
table='bronze_customers',
where=f"_elt_date = '{run_date}'"
)
# Step 2: Extract fresh data
customers = query_source_system()
# Step 3: Insert
warehouse.append(customers)
# Result: Always same 1000 rows for this date, even if retried
Pattern: Staging Table
-- Load into temp table first
CREATE TEMPORARY TABLE staging_customers AS
SELECT * FROM source_system.customers;
-- Delete old staging data
DELETE FROM bronze_customers_staging
WHERE _elt_date = CURRENT_DATE();
-- Rename staging to main (atomic operation)
BEGIN TRANSACTION;
INSERT INTO bronze_customers
SELECT * FROM staging_customers;
-- Mark as complete only after successful insert
INSERT INTO _pipeline_tracking (run_id, status, timestamp)
VALUES (current_run_id, 'COMPLETED', CURRENT_TIMESTAMP());
COMMIT;
Pattern 2: Idempotent Transformation
Definition: Running transformation multiple times on the same input produces the same output.
Anti-Pattern: Running Totals
-- ❌ BAD: Not idempotent (depends on previous state)
UPDATE sales_summary
SET total_revenue = total_revenue + today_revenue
WHERE date = CURRENT_DATE();
-- Run 1: total_revenue = 0 + 1000 = 1000 ✓
-- Run 2 (retry): total_revenue = 1000 + 1000 = 2000 ✗ (WRONG!)
Pattern: Recompute from Scratch
-- ✅ GOOD: Idempotent (always produces same result)
DELETE FROM gold_sales_summary
WHERE summary_date = CURRENT_DATE();
INSERT INTO gold_sales_summary (summary_date, total_revenue, ...)
SELECT
DATE(sale_date) as summary_date,
SUM(amount) as total_revenue,
COUNT(*) as num_orders,
...
FROM silver_sales
WHERE DATE(sale_date) = CURRENT_DATE()
GROUP BY DATE(sale_date);
-- Run 1: Computes total_revenue = 1000 ✓
-- Run 2 (retry): Deletes and recomputes = 1000 ✓ (IDEMPOTENT)
Pattern: Deduplication by Run ID
-- Track which run inserted each row
CREATE TABLE gold_orders_final (
order_id INT,
amount DECIMAL,
_run_id STRING,
_inserted_at TIMESTAMP
);
-- Insert with run tracking
INSERT INTO gold_orders_final
SELECT
o.order_id,
o.amount,
'2024-01-15_run_001' as _run_id,
CURRENT_TIMESTAMP() as _inserted_at
FROM silver_orders o;
-- Before retry, delete previous run's output
DELETE FROM gold_orders_final
WHERE _run_id = '2024-01-15_run_001';
-- Then retry insert (now idempotent!)
Pattern 3: Change Data Capture (CDC)
Problem: Extracting full tables is inefficient. How to detect what changed?
Approach 1: Timestamp-Based CDC
-- Source system tracks last update time
CREATE TABLE source.customers (
customer_id INT,
email STRING,
updated_at TIMESTAMP -- Key column
);
-- Extraction query
WITH previous_run AS (
SELECT MAX(_elt_timestamp) as last_run
FROM bronze.customers_watermark
),
new_changes AS (
SELECT
s.*,
CURRENT_TIMESTAMP() as _elt_timestamp
FROM source.customers s
CROSS JOIN previous_run pr
WHERE s.updated_at > pr.last_run -- Only new/changed rows
)
INSERT INTO bronze.customers
SELECT * FROM new_changes;
-- Update watermark
DELETE FROM bronze.customers_watermark;
INSERT INTO bronze.customers_watermark
SELECT MAX(_elt_timestamp) FROM bronze.customers;
Approach 2: Log-Based CDC (Database Replication Logs)
Source System → Replication Log → CDC Tool → Warehouse
┌──────────────────┐ ┌──────────────┐ ┌─────────┐ ┌──────────┐
│ Before: cust_1 │ → │ DELETE cust_1│→ │ Debezium│→ │ Capture │
│ After: cust_2 │ │ INSERT cust_2│ │ (CDC) │ │ Changes │
└──────────────────┘ └──────────────┘ └─────────┘ └──────────┘
Tools: Debezium, AWS DMS, Oracle GoldenGate, MySQL binlog
Approach 3: Query-Based Comparison
-- Compare source to warehouse (slow but works)
WITH source_data AS (
SELECT customer_id, email, phone, CURRENT_TIMESTAMP() as extracted_at
FROM source.customers
),
warehouse_data AS (
SELECT customer_id, email, phone
FROM bronze.customers
WHERE _elt_timestamp = (SELECT MAX(_elt_timestamp) FROM bronze.customers)
),
changes AS (
SELECT
COALESCE(s.customer_id, w.customer_id) as customer_id,
s.customer_id IS NOT NULL as is_in_source,
w.customer_id IS NOT NULL as is_in_warehouse,
s.email, w.email as old_email,
s.phone, w.phone as old_phone
FROM source_data s
FULL OUTER JOIN warehouse_data w
ON s.customer_id = w.customer_id
WHERE
s.customer_id IS NULL OR -- Deleted in source
w.customer_id IS NULL OR -- New in source
s.email != w.email OR -- Changed
s.phone != w.phone -- Changed
)
INSERT INTO bronze.customers
SELECT
customer_id,
email,
phone,
CURRENT_TIMESTAMP() as _elt_timestamp
FROM changes
WHERE is_in_source = TRUE; -- Only insert/update rows that exist in source
Pattern 4: Late-Arriving Data
Problem: Data arrives out of order or delayed.
Timeline:
──────────────────────────────────────────────
Date: Jan 1 Jan 2 Jan 3 Jan 4
├─ Jan 1 data arrived: Jan 1
├─ Jan 2 data arrived: Jan 2
├─ Jan 3 data arrived: Jan 5 ← Late (2 days)
└─ Jan 4 data arrived: Jan 3 ← Out of order!
Solution: Deferred Aggregation
-- Instead of computing totals immediately, defer them
CREATE TABLE bronze.raw_transactions (
transaction_id INT,
transaction_date DATE,
amount DECIMAL,
_received_date DATE,
_load_id STRING
);
-- Insert all data (even late)
INSERT INTO bronze.raw_transactions
SELECT
transaction_id,
transaction_date,
amount,
CURRENT_DATE() as _received_date,
'run_2024_01_04' as _load_id
FROM source.transactions;
-- Aggregate only data past SLA window (e.g., > 3 days old)
CREATE OR REPLACE TABLE gold.daily_summary AS
SELECT
transaction_date,
SUM(amount) as daily_total,
COUNT(*) as num_transactions,
MAX(_received_date) as latest_received_date,
CURRENT_TIMESTAMP() as _computed_at
FROM bronze.raw_transactions
WHERE transaction_date < (CURRENT_DATE() - INTERVAL 3 DAY)
AND transaction_date >= '2024-01-01'
GROUP BY transaction_date;
-- For recent data, show "preliminary" (still receiving late data)
CREATE OR REPLACE TABLE gold.daily_summary_preliminary AS
SELECT
transaction_date,
SUM(amount) as daily_total,
'PRELIMINARY' as data_status
FROM bronze.raw_transactions
WHERE transaction_date >= (CURRENT_DATE() - INTERVAL 3 DAY)
GROUP BY transaction_date;
Pattern 5: Exactly-Once Semantics
Goal: Ensure each record is processed exactly once, not zero times and not twice.
Approach 1: Idempotent Keys
-- Add a natural idempotent key (business key + timestamp)
CREATE TABLE gold.fact_sales (
idempotent_key STRING, -- Unique per load
order_id INT,
amount DECIMAL,
processed_at TIMESTAMP
);
-- Query to generate idempotent key
SELECT
CONCAT(order_id, '_', DATE(order_date), '_', source_system) as idempotent_key,
order_id,
amount
FROM silver.orders;
-- Use ON CONFLICT to ensure exactly-once
INSERT INTO gold.fact_sales
VALUES (idempotent_key, order_id, amount, CURRENT_TIMESTAMP())
ON CONFLICT (idempotent_key) DO NOTHING;
-- Result: Retries don't duplicate (idempotent key exists, so skipped)
Approach 2: Transactional Semantics
# Use transactions to ensure all-or-nothing
def load_with_exactly_once_semantics():
try:
warehouse.begin_transaction()
# Step 1: Insert fact data
warehouse.insert_into('gold_sales', transformed_data)
# Step 2: Update checkpoint (both in same transaction)
warehouse.update_checkpoint(
source_id='ecommerce_db',
last_processed_id=max_id,
timestamp=now()
)
warehouse.commit() # Either both succeed or both rollback
except Exception as e:
warehouse.rollback()
raise
Pattern 6: Handling Duplicates & Conflicts
Merge Pattern (UPSERT)
-- Update existing, insert new (idempotent)
MERGE INTO gold.dim_customer AS target
USING (
SELECT customer_id, email, phone, updated_at
FROM silver.customers
WHERE _load_date = CURRENT_DATE()
) AS source
ON target.customer_id = source.customer_id
AND target.is_current = TRUE
WHEN MATCHED AND source.updated_at > target.updated_at THEN
UPDATE SET
email = source.email,
phone = source.phone,
updated_at = source.updated_at,
is_current = TRUE
WHEN NOT MATCHED THEN
INSERT (customer_id, email, phone, updated_at, is_current)
VALUES (source.customer_id, source.email, source.phone, source.updated_at, TRUE);
ETL Anti-Patterns to Avoid
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Non-idempotent ops | Retries cause duplicates | Design delete-then-insert or deduplication |
| Shared state | Order-dependent runs | Use explicit watermarks or timestamps |
| No error handling | Partial loads | Transactions + checkpoints |
| No monitoring | Failures undetected | Add quality gates and alerting |
| Full reprocessing | Slow pipelines | Implement CDC or incremental patterns |
Monitoring & Alerting
Pipeline Health Checks
-- Alert if pipeline is late
SELECT
source_system,
MAX(_elt_timestamp) as last_load,
CURRENT_TIMESTAMP() as current_time,
TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), MAX(_elt_timestamp), HOUR) as hours_since_load
FROM bronze.data_sources_log
GROUP BY source_system
HAVING hours_since_load > 2; -- Alert if > 2 hours late
-- Alert on quality degradation
SELECT
table_name,
COUNT(*) as total_rows,
COUNTIF(_is_valid = FALSE) as invalid_rows,
SAFE_DIVIDE(COUNTIF(_is_valid = FALSE), COUNT(*)) as error_rate
FROM bronze.quality_tracking
WHERE _load_date = CURRENT_DATE()
GROUP BY table_name
HAVING error_rate > 0.01; -- Alert if > 1% invalid
Key Takeaways
- Idempotency is the foundation of reliable pipelines
- Exactly-once semantics requires idempotent keys or transactions
- CDC patterns reduce extraction overhead
- Late-arriving data requires deferred aggregation
- Monitoring ensures pipelines stay healthy