9.02 · Deep Dive: Data Quality Implementation

Level: Advanced Time to read: 16 min Pre-reading: 09 · Enterprise Data Management After reading: You'll understand quality frameworks, test strategies, SLAs, and how to operationalize quality monitoring.


Data Quality: The Five Dimensions

DAMA Framework (Industry Standard)

Dimension Definition Example Check
Completeness Required fields are non-null customer_id NOT NULL in dim_customer
Uniqueness No duplicate business keys customer_id has no duplicates
Timeliness Data is current, not stale updated_at < CURRENT_TIMESTAMP()
Accuracy Values match business rules email contains '@' and '.'
Consistency Data matches across sources customer_id in orders matches dim_customer

Three Levels of Quality

Level 1: Bronze (Raw)

-- Bronze layer quality: Unknown/As-Is
CREATE TABLE bronze.customers (
  customer_id INT,
  email STRING,
  phone STRING,
  created_at TIMESTAMP,
  _elt_timestamp TIMESTAMP,
  _elt_source STRING
);

-- Trust: ❌ None (could be duplicates, nulls, invalid emails)
-- Constraints: ❌ No constraints (data lake = anything goes)
-- SLA: None

Level 2: Silver (Validated)

-- Silver layer quality: Certified
CREATE TABLE silver.customers (
  customer_id INT,
  email STRING,
  phone STRING,
  created_at TIMESTAMP,
  _is_valid BOOLEAN,
  _quality_issues ARRAY<STRING>
);

-- Validation checks:
WITH validated AS (
  SELECT
    customer_id,
    email,
    phone,
    created_at,
    CASE
      WHEN customer_id IS NULL THEN FALSE
      WHEN email NOT LIKE '%@%.%' THEN FALSE
      WHEN created_at > CURRENT_TIMESTAMP() THEN FALSE
      ELSE TRUE
    END as _is_valid,
    CASE
      WHEN customer_id IS NULL THEN ['missing_customer_id']
      WHEN email NOT LIKE '%@%.%' THEN ['invalid_email']
      WHEN created_at > CURRENT_TIMESTAMP() THEN ['future_created_date']
      ELSE []
    END as _quality_issues
  FROM bronze.customers
)
INSERT INTO silver.customers
SELECT * FROM validated;

-- Trust: ✅ Partial (passed validation, but no SLA)
-- Constraints: ✅ Validation rules applied
-- SLA: "Within 4 hours of bronze load"

Level 3: Gold (SLA-Guaranteed)

-- Gold layer quality: SLA guaranteed
CREATE TABLE gold.dim_customer AS
SELECT
  customer_id,
  email,
  phone,
  created_at,
  current_timestamp() as _dbt_updated_at
FROM silver.customers
WHERE _is_valid = TRUE;

-- Trust: ✅✅ Full (SLA-backed)
-- Constraints: ✅ Multiple constraints + tests
-- SLA: "99.95% accuracy, 100% completeness, loaded by 6 AM"

Quality Test Frameworks

dbt Tests (SQL-Native)

# dbt/models/gold/gold_dim_customer.yml
version: 2

models:
  - name: gold_dim_customer
    description: "Customer dimension with 99.95% SLA"

    # Model-level tests
    tests:
      - dbt_utils.recency:
          datepart: day
          interval: 1
          field: _dbt_updated_at

      - row_count:
          above: 1000000  # Alert if < 1M rows

    columns:
      - name: customer_id
        description: "Unique customer identifier"
        tests:
          - not_null  # No nulls allowed
          - unique    # All distinct
          - relationships:
              to: ref('bronze_customers')
              field: customer_id

      - name: email
        description: "Customer email address"
        tests:
          - not_null
          - custom_regex:
              pattern: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"

      - name: created_at
        description: "Account creation timestamp"
        tests:
          - not_null
          - expression_is_true:
              expression: "created_at < current_timestamp()"

# Custom test macro (dbt/macros/custom_regex.sql)
{% macro test_custom_regex(model, column_name, pattern) %}
  SELECT * FROM {{ model }}
  WHERE {{ column_name }} NOT REGEXP '{{ pattern }}'
{% endmacro %}

Great Expectations (Python-Based)

# great_expectations/expectations.py
import great_expectations as gx

context = gx.get_context()

# Define expectations
batch_request = {
    "datasource_name": "snowflake",
    "data_connector_name": "default",
    "data_asset_name": "gold_dim_customer",
}

validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="gold_customer_expectations"
)

# Completeness
validator.expect_column_values_to_not_be_null(column="customer_id")
validator.expect_column_values_to_not_be_null(column="email")

# Uniqueness
validator.expect_column_values_to_be_unique(column="customer_id")

# Accuracy
validator.expect_column_values_to_match_regex(
    column="email",
    regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)

# Timeliness
validator.expect_column_values_to_be_increasing(column="created_at")

# Run validation
validator.validate()

SLA Framework

Defining SLAs

# Service Level Agreement per table
gold_dim_customer:
  freshness: "Loaded by 6 AM ET daily"
  completeness: "100% for business-critical fields"
  accuracy: "99.95% (verified nightly)"
  availability: "99.9% uptime"

  quality_metrics:
    - null_rate: "< 0.1%"  # Less than 0.1% nulls
    - duplicate_rate: "0%"  # Zero duplicates
    - validity_rate: "99%"  # 99% match business rules

  response_times:
    - query < 5 seconds (p99)
    - join with fact tables < 10 seconds (p99)

gold_fact_sales:
  freshness: "Loaded by 8 AM ET daily"
  completeness: "100% for customer_key, product_key, amount"
  accuracy: "99.9% (sample audit weekly)"

  volume_check: "Previous day ± 20%"
  # Alert if today's rows are 20% higher/lower than yesterday

Monitoring SLAs

-- SLA monitoring query
CREATE OR REPLACE TABLE sla_monitoring AS
SELECT
  table_name,
  current_date as check_date,

  -- Freshness: When was data last loaded?
  TIMESTAMP_DIFF(CURRENT_TIMESTAMP(),
    MAX(_dbt_updated_at), HOUR) as hours_since_load,
  CASE WHEN hours_since_load <= 24 THEN 'PASS' ELSE 'FAIL' END as freshness_status,

  -- Completeness: % non-null for critical fields
  COUNTIF(customer_id IS NOT NULL) / COUNT(*) as customer_id_completeness,
  CASE WHEN customer_id_completeness >= 0.99 THEN 'PASS' ELSE 'FAIL' END as completeness_status,

  -- Uniqueness: Duplicate rate
  1 - (COUNT(DISTINCT customer_id) / COUNT(*)) as duplicate_rate,
  CASE WHEN duplicate_rate = 0 THEN 'PASS' ELSE 'FAIL' END as uniqueness_status,

  -- Volume: Check against previous day
  COUNT(*) as current_volume,
  LAG(COUNT(*)) OVER (PARTITION BY table_name ORDER BY check_date) as previous_volume,
  SAFE_DIVIDE(current_volume - previous_volume, previous_volume) as volume_change,
  CASE 
    WHEN ABS(volume_change) <= 0.2 THEN 'PASS'
    ELSE 'FAIL'
  END as volume_status,

  -- Overall SLA status
  CASE
    WHEN freshness_status = 'FAIL' OR completeness_status = 'FAIL' OR uniqueness_status = 'FAIL'
    THEN 'FAILED'
    ELSE 'PASSED'
  END as overall_status

FROM gold_tables
GROUP BY table_name, check_date;

-- Alert if SLA failed
SELECT * FROM sla_monitoring
WHERE overall_status = 'FAILED'
-- Send to Slack, PagerDuty, email

Quality Incident Response

Incident Triage

Severity Levels:
┌─ CRITICAL (P0)
│  ├─ Production data missing (count = 0)
│  ├─ Data corruption (100%+ duplicates)
│  ├─ PII breach (PII fields exposed)
│  └─ Response time: < 1 hour
├─ HIGH (P1)
│  ├─ SLA missed (data late > 2 hours)
│  ├─ Quality < 95% (5%+ invalid records)
│  ├─ Sensitive field null > 1%
│  └─ Response time: < 4 hours
└─ MEDIUM (P2)
   ├─ Quality 95-99%
   ├─ Minor schema issue
   └─ Response time: < 24 hours

Incident Playbook

Step 1: Alert Received
  └─ Alert: "gold_dim_customer SLA FAILED"
     └─ Freshness: Data not loaded since 10 AM (4 hours late!)

Step 2: Triage
  └─ Check status:
     ├─ Pipeline: RUNNING (stuck for 4 hours)
     ├─ Warehouse: HEALTHY (no issues)
     ├─ Data quality: Unable to assess (no data)
     └─ Severity: CRITICAL (P0)

Step 3: Investigation
  └─ Check logs:
     ├─ dbt job: RUNNING (4 hours elapsed, normally 30 min)
     ├─ Query execution: Stuck on silver_customers transform
     ├─ Database: High CPU usage (90%)
     └─ Root cause: Unexpectedly large data from source (10x normal volume)

Step 4: Mitigation
  └─ Options:
     A) Kill stuck job, wait for retry (5 min)
     B) Manually restart pipeline with reduced data (15 min)
     C) Scale warehouse to handle load (20 min)
     └─ Choose: B (fastest, proven path)

Step 5: Resolution
  └─ Actions:
     ├─ Restart pipeline with 2x workers
     ├─ Complete in 45 minutes
     ├─ Data loaded by 11 AM (still 7 hours late)
     └─ Document incident report

Step 6: Post-Incident Review
  └─ Questions:
     ├─ Why was source data 10x larger? (bug in source)
     ├─ Why didn't we detect early? (no alerting on source size)
     ├─ How to prevent? (add source data volume check)
     └─ Update pipelines to handle 50x data (scaling strategy)

Quality Tools & Platforms

Tool Type Use Case
dbt tests SQL-native Simple rules, built into dbt workflows
Great Expectations Python-based Complex expectations, detailed reporting
Soda Monitoring Continuous quality monitoring, SLA tracking
Monte Carlo Data ML-based Anomaly detection, column-level monitoring
Collibra Governance Quality frameworks, lineage, incidents

Key Takeaways

  1. DAMA framework defines five dimensions of quality
  2. Three-tier quality: Bronze (unknown) → Silver (certified) → Gold (SLA)
  3. dbt tests are SQL-native and integrate with dbt workflows
  4. SLAs guarantee minimum quality levels
  5. Monitor continuously with automated tests and dashboards
  6. Incident response requires clear severity levels and playbooks