4.02 · Deep Dive: Data Pipeline Orchestration & Tools
Level: Advanced Time to read: 18 min Pre-reading: 04 · ETL & ELT · 4.01 · ETL Patterns After reading: You'll understand pipeline orchestration tools, DAG execution, scheduling strategies, and production-grade pipeline patterns.
The Pipeline Orchestration Problem
Manual Approach:
cron job → Extract → Transform → Load → ???
↓
What if step 2 fails?
Who notifies us?
When do we retry?
Solution: Orchestration tool manages dependency flow
Core Concepts
Directed Acyclic Graph (DAG)
graph TD
A["🟢 Extract<br/>Customers<br/>from Salesforce"]
B["🟢 Extract<br/>Orders<br/>from E-commerce"]
C["🟡 Extract<br/>Product<br/>Catalog"]
D["🔵 Transform<br/>Clean & Dedupe"]
E["🔵 Transform<br/>Dimensional<br/>Models"]
F["🟠 Load<br/>Gold Layer"]
G["🟣 Quality<br/>Tests"]
H["🟢 Notify<br/>Analytics Team"]
A -->|customer data| D
B -->|order data| D
C -->|product data| E
D -->|validated data| E
E -->|dimensional models| F
F -->|load complete| G
G -->|tests pass| H
G -->|tests fail| I["🔴 Alert Team"]
style A fill:#90EE90
style B fill:#90EE90
style C fill:#FFD700
style D fill:#87CEEB
style E fill:#87CEEB
style F fill:#FF8C00
style G fill:#DDA0DD
style H fill:#90EE90
style I fill:#FF6B6B
Key Properties:
- ✅ No cycles (DAG = prevents infinite loops)
- ✅ Dependencies explicit (task A must complete before B)
- ✅ Parallelizable (tasks C and D can run simultaneously)
Orchestration Tools Comparison
| Tool | Best For | Complexity | Popularity | Cost |
|---|---|---|---|---|
| Apache Airflow | Complex workflows | High | ⭐⭐⭐⭐⭐ | Open source |
| Databricks Workflows | ML + Data | Medium | ⭐⭐⭐⭐ | Enterprise |
| dbt Cloud | SQL-native ELT | Low-Medium | ⭐⭐⭐⭐ | Freemium |
| Prefect | Modern Python pipelines | Medium | ⭐⭐⭐ | Open source / Cloud |
| Dagster | Observable data ops | Medium-High | ⭐⭐⭐ | Open source / Cloud |
| AWS Glue Workflows | AWS-native | Medium | ⭐⭐⭐ | AWS pricing |
| Cron + Shell Scripts | Simple jobs | Low | ⭐ | $0 |
Pattern 1: Apache Airflow DAGs
Basic DAG Structure
# dags/ecommerce_etl.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyTableOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.task_group import TaskGroup
# Default arguments (apply to all tasks)
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email': ['dataeng@company.com'],
'email_on_failure': True,
'start_date': datetime(2024, 1, 1),
}
# Define DAG
dag = DAG(
'ecommerce_etl_daily',
default_args=default_args,
description='Daily ELT pipeline for ecommerce warehouse',
schedule_interval='0 2 * * *', # 2 AM daily
catchup=False, # Don't backfill old runs
tags=['ecommerce', 'daily', 'etl'],
)
# Task 1: Extract customer data
def extract_customers():
import pandas as pd
from google.cloud import bigquery
# Extract from Salesforce
salesforce_client = SalesforceConnector('salesforce_prod')
customers = salesforce_client.query(
"SELECT id, name, email, created_date FROM accounts WHERE is_active = true"
)
# Load to Bronze (BigQuery)
bq = bigquery.Client()
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_APPEND",
schema=[
bigquery.SchemaField("customer_id", "INTEGER"),
bigquery.SchemaField("customer_name", "STRING"),
bigquery.SchemaField("email", "STRING"),
bigquery.SchemaField("created_date", "DATE"),
bigquery.SchemaField("_elt_timestamp", "TIMESTAMP"),
],
)
job = bq.load_table_from_dataframe(
customers,
"project.dataset.bronze_customers",
job_config=job_config,
)
job.result()
return f"Loaded {len(customers)} customers"
# Task 2: Extract orders
def extract_orders():
from airflow.providers.mysql.hooks.mysql import MySqlHook
hook = MySqlHook(mysql_conn_id='ecommerce_db')
records = hook.get_records(
sql="SELECT order_id, customer_id, amount, order_date FROM orders WHERE DATE(order_date) = CURRENT_DATE()"
)
# Load to warehouse
# ...
# Task operators
extract_customers_task = PythonOperator(
task_id='extract_customers',
python_callable=extract_customers,
dag=dag,
pool='api_pool', # Rate limit: only 2 concurrent API calls
)
extract_orders_task = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
dag=dag,
pool='api_pool',
)
# Transformation task group
with TaskGroup('transform', dag=dag) as transform_group:
transform_customers = SnowflakeOperator(
task_id='transform_customers',
sql='sql/silver_customers.sql',
snowflake_conn_id='snowflake_prod',
)
transform_orders = SnowflakeOperator(
task_id='transform_orders',
sql='sql/silver_orders.sql',
snowflake_conn_id='snowflake_prod',
)
# Load task
load_gold = SnowflakeOperator(
task_id='load_gold_layer',
sql='sql/gold_fact_sales.sql',
snowflake_conn_id='snowflake_prod',
dag=dag,
)
# Testing task
def run_data_quality_checks():
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
# Run dbt tests
# Fail if any tests fail
return dbt_test_result
quality_check = PythonOperator(
task_id='data_quality_checks',
python_callable=run_data_quality_checks,
dag=dag,
)
# Notification task
def notify_success():
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
return SlackWebhookOperator(
task_id='notify_slack',
http_conn_id='slack_webhook',
message='✅ Daily ecommerce ETL completed successfully',
dag=dag,
)
notify_task = PythonOperator(
task_id='notify_completion',
python_callable=notify_success,
trigger_rule='all_success', # Only if all upstream succeeded
dag=dag,
)
# Define dependencies
extract_customers_task >> transform_customers
extract_orders_task >> transform_orders
transform_group >> load_gold
load_gold >> quality_check
quality_check >> notify_task
Key Airflow Concepts
| Concept | Purpose | Example |
|---|---|---|
| DAG | Workflow definition | ecommerce_etl_daily |
| Task | Single unit of work | extract_customers, transform_orders |
| Operator | Task implementation | PythonOperator, SnowflakeOperator |
| Sensor | Wait for condition | ExternalTaskSensor (wait for upstream) |
| Hook | Connection management | MySqlHook (connection details) |
| Pool | Resource limiting | api_pool (limit concurrent API calls) |
Pattern 2: dbt Cloud Scheduling
dbt Project Structure
dbt_project.yml
models/
├── staging/
│ ├── stg_customers.sql
│ └── stg_orders.sql
├── silver/
│ ├── silver_customers.sql
│ └── silver_orders.sql
└── gold/
└── gold_fact_sales.sql
tests/
├── integrity_tests.sql
└── quality_tests.sql
dbt Job Configuration
# dbt Cloud: Job configuration
name: daily_ecommerce_etl
jobs:
- name: daily_elt
description: "Daily ELT pipeline"
# Trigger
triggers:
- on_schedule:
interval: 0 2 * * * # 2 AM daily
# Execution
commands:
- dbt debug
- dbt seed # Load CSV reference data
- dbt run --select model_type:table
- dbt test # Run quality tests
- dbt snapshot # Capture dimension versions
# Notifications
notifications:
- type: slack
on: failure
webhook_url: https://hooks.slack.com/...
# Environment
environment:
name: prod
warehouse_type: snowflake
threads: 4 # Parallel execution
dbt Models Example
-- models/silver/silver_customers.sql
{{
config(
materialized='incremental',
unique_key='customer_id',
on_schema_change='sync_all_columns',
indexes=[
{'columns': ['customer_id'], 'type': 'btree'},
]
)
}}
SELECT
CAST(customer_id AS INT64) as customer_id,
LOWER(TRIM(email)) as email,
phone,
created_at,
updated_at,
CURRENT_TIMESTAMP() as _dbt_updated_at,
FROM {{ ref('bronze_customers') }}
WHERE
customer_id IS NOT NULL
AND email LIKE '%@%'
{% if execute_macros %}
AND updated_at >= (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Pattern 3: Databricks Workflows
Workflow with Notebooks
# Databricks job: Machine-readable workflow
{
"name": "ecommerce_daily_elt",
"tasks": [
{
"task_key": "extract_customers",
"notebook_task": {
"notebook_path": "/Users/data-eng/notebooks/extract_customers",
"base_parameters": {
"env": "prod",
"date": "{{ job.start_time }}"
}
},
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4
}
},
{
"task_key": "transform_and_load",
"notebook_task": {
"notebook_path": "/Users/data-eng/notebooks/silver_transformations"
},
"depends_on": [{"task_key": "extract_customers"}],
"cluster_key": "shared_cluster"
}
],
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?",
"timezone_id": "America/New_York"
}
}
Advanced Patterns
Pattern: Monitoring & Alerting
# Monitor task execution and data quality
import logging
from airflow.exceptions import AirflowException
def check_pipeline_health():
"""Alert if pipeline is running slow or producing bad data"""
# Check 1: Is pipeline running on schedule?
last_run = get_last_pipeline_run_timestamp()
expected_run_time = datetime.now() - timedelta(hours=24)
if last_run < expected_run_time:
raise AirflowException("Pipeline hasn't run in 24+ hours!")
# Check 2: Is data quality degrading?
query = """
SELECT
table_name,
COUNTIF(_is_valid = FALSE) / COUNT(*) as error_rate
FROM bronze.quality_logs
WHERE _load_date = CURRENT_DATE()
GROUP BY table_name
HAVING error_rate > 0.01
"""
bad_tables = query_warehouse(query)
if bad_tables:
raise AirflowException(f"Data quality issues: {bad_tables}")
return "Pipeline healthy ✓"
Pattern: Incremental Processing
# Only process new/changed data
def extract_incremental(source_conn, last_watermark):
"""Extract only rows that changed since last run"""
query = f"""
SELECT * FROM customers
WHERE updated_at > '{last_watermark}'
"""
new_data = source_conn.query(query)
# Update watermark for next run
new_watermark = max(new_data['updated_at'])
save_watermark('customers_extract', new_watermark)
return new_data
Production-Grade Patterns
Pattern: Dynamic DAGs
# Generate DAGs for multiple data sources
from airflow.utils.task_group import TaskGroup
DATA_SOURCES = {
'salesforce': 'salesforce_prod',
'ecommerce': 'ecommerce_db',
'analytics': 'analytics_warehouse',
}
for source_name, conn_id in DATA_SOURCES.items():
with TaskGroup(f'extract_{source_name}', dag=dag) as extract_group:
extract_task = PythonOperator(
task_id=f'extract_{source_name}_data',
python_callable=extract_from_source,
op_kwargs={'source': source_name, 'conn_id': conn_id},
)
Pattern: Error Handling & Retries
# Graceful degradation on failures
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, # exponential backoff: 5 min, 10 min, 20 min
'max_retry_delay': timedelta(hours=1),
}
# Task-level overrides
extract_task = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
retries=5, # More retries for flaky API
execution_timeout=timedelta(hours=2), # Fail if > 2 hours
sla_misses_callback=alert_sla_miss, # Alert if SLA missed
)
Key Takeaways
- DAGs define workflow dependencies (no cycles)
- Orchestration tools manage scheduling, retries, and monitoring
- dbt Cloud is great for SQL-native ELT pipelines
- Airflow is flexible for complex, multi-step workflows
- Production pipelines need monitoring, alerting, and error handling
- Incremental processing reduces pipeline runtime and cost