4.02 · Deep Dive: Data Pipeline Orchestration & Tools

Level: Advanced Time to read: 18 min Pre-reading: 04 · ETL & ELT · 4.01 · ETL Patterns After reading: You'll understand pipeline orchestration tools, DAG execution, scheduling strategies, and production-grade pipeline patterns.


The Pipeline Orchestration Problem

Manual Approach:
cron job → Extract → Transform → Load → ???
           What if step 2 fails?
           Who notifies us?
           When do we retry?

Solution: Orchestration tool manages dependency flow

Core Concepts

Directed Acyclic Graph (DAG)

graph TD
    A["🟢 Extract<br/>Customers<br/>from Salesforce"]
    B["🟢 Extract<br/>Orders<br/>from E-commerce"]
    C["🟡 Extract<br/>Product<br/>Catalog"]
    D["🔵 Transform<br/>Clean & Dedupe"]
    E["🔵 Transform<br/>Dimensional<br/>Models"]
    F["🟠 Load<br/>Gold Layer"]
    G["🟣 Quality<br/>Tests"]
    H["🟢 Notify<br/>Analytics Team"]

    A -->|customer data| D
    B -->|order data| D
    C -->|product data| E
    D -->|validated data| E
    E -->|dimensional models| F
    F -->|load complete| G
    G -->|tests pass| H
    G -->|tests fail| I["🔴 Alert Team"]

    style A fill:#90EE90
    style B fill:#90EE90
    style C fill:#FFD700
    style D fill:#87CEEB
    style E fill:#87CEEB
    style F fill:#FF8C00
    style G fill:#DDA0DD
    style H fill:#90EE90
    style I fill:#FF6B6B

Key Properties:

  • ✅ No cycles (DAG = prevents infinite loops)
  • ✅ Dependencies explicit (task A must complete before B)
  • ✅ Parallelizable (tasks C and D can run simultaneously)

Orchestration Tools Comparison

Tool Best For Complexity Popularity Cost
Apache Airflow Complex workflows High ⭐⭐⭐⭐⭐ Open source
Databricks Workflows ML + Data Medium ⭐⭐⭐⭐ Enterprise
dbt Cloud SQL-native ELT Low-Medium ⭐⭐⭐⭐ Freemium
Prefect Modern Python pipelines Medium ⭐⭐⭐ Open source / Cloud
Dagster Observable data ops Medium-High ⭐⭐⭐ Open source / Cloud
AWS Glue Workflows AWS-native Medium ⭐⭐⭐ AWS pricing
Cron + Shell Scripts Simple jobs Low $0

Pattern 1: Apache Airflow DAGs

Basic DAG Structure

# dags/ecommerce_etl.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyTableOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.task_group import TaskGroup

# Default arguments (apply to all tasks)
default_args = {
    'owner': 'data-engineering',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email': ['dataeng@company.com'],
    'email_on_failure': True,
    'start_date': datetime(2024, 1, 1),
}

# Define DAG
dag = DAG(
    'ecommerce_etl_daily',
    default_args=default_args,
    description='Daily ELT pipeline for ecommerce warehouse',
    schedule_interval='0 2 * * *',  # 2 AM daily
    catchup=False,  # Don't backfill old runs
    tags=['ecommerce', 'daily', 'etl'],
)

# Task 1: Extract customer data
def extract_customers():
    import pandas as pd
    from google.cloud import bigquery

    # Extract from Salesforce
    salesforce_client = SalesforceConnector('salesforce_prod')
    customers = salesforce_client.query(
        "SELECT id, name, email, created_date FROM accounts WHERE is_active = true"
    )

    # Load to Bronze (BigQuery)
    bq = bigquery.Client()
    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",
        schema=[
            bigquery.SchemaField("customer_id", "INTEGER"),
            bigquery.SchemaField("customer_name", "STRING"),
            bigquery.SchemaField("email", "STRING"),
            bigquery.SchemaField("created_date", "DATE"),
            bigquery.SchemaField("_elt_timestamp", "TIMESTAMP"),
        ],
    )

    job = bq.load_table_from_dataframe(
        customers,
        "project.dataset.bronze_customers",
        job_config=job_config,
    )
    job.result()

    return f"Loaded {len(customers)} customers"

# Task 2: Extract orders
def extract_orders():
    from airflow.providers.mysql.hooks.mysql import MySqlHook

    hook = MySqlHook(mysql_conn_id='ecommerce_db')
    records = hook.get_records(
        sql="SELECT order_id, customer_id, amount, order_date FROM orders WHERE DATE(order_date) = CURRENT_DATE()"
    )

    # Load to warehouse
    # ...

# Task operators
extract_customers_task = PythonOperator(
    task_id='extract_customers',
    python_callable=extract_customers,
    dag=dag,
    pool='api_pool',  # Rate limit: only 2 concurrent API calls
)

extract_orders_task = PythonOperator(
    task_id='extract_orders',
    python_callable=extract_orders,
    dag=dag,
    pool='api_pool',
)

# Transformation task group
with TaskGroup('transform', dag=dag) as transform_group:
    transform_customers = SnowflakeOperator(
        task_id='transform_customers',
        sql='sql/silver_customers.sql',
        snowflake_conn_id='snowflake_prod',
    )

    transform_orders = SnowflakeOperator(
        task_id='transform_orders',
        sql='sql/silver_orders.sql',
        snowflake_conn_id='snowflake_prod',
    )

# Load task
load_gold = SnowflakeOperator(
    task_id='load_gold_layer',
    sql='sql/gold_fact_sales.sql',
    snowflake_conn_id='snowflake_prod',
    dag=dag,
)

# Testing task
def run_data_quality_checks():
    from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

    # Run dbt tests
    # Fail if any tests fail
    return dbt_test_result

quality_check = PythonOperator(
    task_id='data_quality_checks',
    python_callable=run_data_quality_checks,
    dag=dag,
)

# Notification task
def notify_success():
    from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

    return SlackWebhookOperator(
        task_id='notify_slack',
        http_conn_id='slack_webhook',
        message='✅ Daily ecommerce ETL completed successfully',
        dag=dag,
    )

notify_task = PythonOperator(
    task_id='notify_completion',
    python_callable=notify_success,
    trigger_rule='all_success',  # Only if all upstream succeeded
    dag=dag,
)

# Define dependencies
extract_customers_task >> transform_customers
extract_orders_task >> transform_orders
transform_group >> load_gold
load_gold >> quality_check
quality_check >> notify_task

Key Airflow Concepts

Concept Purpose Example
DAG Workflow definition ecommerce_etl_daily
Task Single unit of work extract_customers, transform_orders
Operator Task implementation PythonOperator, SnowflakeOperator
Sensor Wait for condition ExternalTaskSensor (wait for upstream)
Hook Connection management MySqlHook (connection details)
Pool Resource limiting api_pool (limit concurrent API calls)

Pattern 2: dbt Cloud Scheduling

dbt Project Structure

dbt_project.yml
models/
├── staging/
│   ├── stg_customers.sql
│   └── stg_orders.sql
├── silver/
│   ├── silver_customers.sql
│   └── silver_orders.sql
└── gold/
    └── gold_fact_sales.sql
tests/
├── integrity_tests.sql
└── quality_tests.sql

dbt Job Configuration

# dbt Cloud: Job configuration
name: daily_ecommerce_etl
jobs:
  - name: daily_elt
    description: "Daily ELT pipeline"

    # Trigger
    triggers:
      - on_schedule:
          interval: 0 2 * * *  # 2 AM daily

    # Execution
    commands:
      - dbt debug
      - dbt seed  # Load CSV reference data
      - dbt run --select model_type:table
      - dbt test  # Run quality tests
      - dbt snapshot  # Capture dimension versions

    # Notifications
    notifications:
      - type: slack
        on: failure
        webhook_url: https://hooks.slack.com/...

    # Environment
    environment:
      name: prod
      warehouse_type: snowflake
      threads: 4  # Parallel execution

dbt Models Example

-- models/silver/silver_customers.sql
{{
  config(
    materialized='incremental',
    unique_key='customer_id',
    on_schema_change='sync_all_columns',
    indexes=[
      {'columns': ['customer_id'], 'type': 'btree'},
    ]
  )
}}

SELECT
  CAST(customer_id AS INT64) as customer_id,
  LOWER(TRIM(email)) as email,
  phone,
  created_at,
  updated_at,
  CURRENT_TIMESTAMP() as _dbt_updated_at,
FROM {{ ref('bronze_customers') }}
WHERE
  customer_id IS NOT NULL
  AND email LIKE '%@%'

{% if execute_macros %}
  AND updated_at >= (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Pattern 3: Databricks Workflows

Workflow with Notebooks

# Databricks job: Machine-readable workflow
{
  "name": "ecommerce_daily_elt",
  "tasks": [
    {
      "task_key": "extract_customers",
      "notebook_task": {
        "notebook_path": "/Users/data-eng/notebooks/extract_customers",
        "base_parameters": {
          "env": "prod",
          "date": "{{ job.start_time }}"
        }
      },
      "new_cluster": {
        "spark_version": "13.3.x-scala2.12",
        "node_type_id": "i3.xlarge",
        "num_workers": 4
      }
    },
    {
      "task_key": "transform_and_load",
      "notebook_task": {
        "notebook_path": "/Users/data-eng/notebooks/silver_transformations"
      },
      "depends_on": [{"task_key": "extract_customers"}],
      "cluster_key": "shared_cluster"
    }
  ],
  "schedule": {
    "quartz_cron_expression": "0 0 2 * * ?",
    "timezone_id": "America/New_York"
  }
}

Advanced Patterns

Pattern: Monitoring & Alerting

# Monitor task execution and data quality
import logging
from airflow.exceptions import AirflowException

def check_pipeline_health():
    """Alert if pipeline is running slow or producing bad data"""

    # Check 1: Is pipeline running on schedule?
    last_run = get_last_pipeline_run_timestamp()
    expected_run_time = datetime.now() - timedelta(hours=24)

    if last_run < expected_run_time:
        raise AirflowException("Pipeline hasn't run in 24+ hours!")

    # Check 2: Is data quality degrading?
    query = """
    SELECT
      table_name,
      COUNTIF(_is_valid = FALSE) / COUNT(*) as error_rate
    FROM bronze.quality_logs
    WHERE _load_date = CURRENT_DATE()
    GROUP BY table_name
    HAVING error_rate > 0.01
    """

    bad_tables = query_warehouse(query)
    if bad_tables:
        raise AirflowException(f"Data quality issues: {bad_tables}")

    return "Pipeline healthy ✓"

Pattern: Incremental Processing

# Only process new/changed data
def extract_incremental(source_conn, last_watermark):
    """Extract only rows that changed since last run"""

    query = f"""
    SELECT * FROM customers
    WHERE updated_at > '{last_watermark}'
    """

    new_data = source_conn.query(query)

    # Update watermark for next run
    new_watermark = max(new_data['updated_at'])
    save_watermark('customers_extract', new_watermark)

    return new_data

Production-Grade Patterns

Pattern: Dynamic DAGs

# Generate DAGs for multiple data sources
from airflow.utils.task_group import TaskGroup

DATA_SOURCES = {
    'salesforce': 'salesforce_prod',
    'ecommerce': 'ecommerce_db',
    'analytics': 'analytics_warehouse',
}

for source_name, conn_id in DATA_SOURCES.items():
    with TaskGroup(f'extract_{source_name}', dag=dag) as extract_group:
        extract_task = PythonOperator(
            task_id=f'extract_{source_name}_data',
            python_callable=extract_from_source,
            op_kwargs={'source': source_name, 'conn_id': conn_id},
        )

Pattern: Error Handling & Retries

# Graceful degradation on failures
default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,  # exponential backoff: 5 min, 10 min, 20 min
    'max_retry_delay': timedelta(hours=1),
}

# Task-level overrides
extract_task = PythonOperator(
    task_id='extract_orders',
    python_callable=extract_orders,
    retries=5,  # More retries for flaky API
    execution_timeout=timedelta(hours=2),  # Fail if > 2 hours
    sla_misses_callback=alert_sla_miss,  # Alert if SLA missed
)

Key Takeaways

  1. DAGs define workflow dependencies (no cycles)
  2. Orchestration tools manage scheduling, retries, and monitoring
  3. dbt Cloud is great for SQL-native ELT pipelines
  4. Airflow is flexible for complex, multi-step workflows
  5. Production pipelines need monitoring, alerting, and error handling
  6. Incremental processing reduces pipeline runtime and cost