Skip to main content

Overview

The Airflow integration enables OpenSRE to investigate DAG failures and extract execution context directly from an Apache Airflow instance. It supports:
  • DAG run inspection
  • Task instance retrieval
  • Failure detection
  • Evidence collection for RCA generation
This integration is designed for incident-driven workflows, where an alert referencing a DAG triggers an investigation.

Architecture

The Airflow integration participates in the investigation pipeline as follows:
  1. Alert ingestion
  2. Planner selects relevant tools
  3. Airflow API is queried
  4. Evidence is collected
  5. RCA is generated
Alert → Planner → Airflow tools → Evidence → RCA

Configuration

Required Environment Variables

AIRFLOW_BASE_URL=http://localhost:8080

# Authentication (choose one)

# Basic Auth
AIRFLOW_USERNAME=your_username
AIRFLOW_PASSWORD=your_password

# Token-based (if supported)
AIRFLOW_AUTH_TOKEN=your_token

# Optional
AIRFLOW_TIMEOUT_SECONDS=15
AIRFLOW_VERIFY_SSL=true
AIRFLOW_MAX_RESULTS=50

Setup Example

Start Airflow locally:
docker run -p 8080:8080 apache/airflow:2.8.1 standalone
Create a failing DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def fail_task():
    raise Exception("Intentional failure")

with DAG(
    dag_id="test_fail_dag",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    PythonOperator(
        task_id="fail_task",
        python_callable=fail_task,
    )
Trigger the DAG:
airflow dags trigger test_fail_dag

Investigation Flow

Run the investigation CLI:
python -m app.cli investigate
Provide the alert payload:
{
  "source": "airflow",
  "message": "Airflow DAG test_fail_dag failed",
  "metadata": {
    "dag_id": "test_fail_dag"
  }
}

Capabilities

CapabilityDescription
List DAG runsFetch execution history
Get task instancesInspect task-level failures
Detect failuresIdentify recent failing runs
RCA supportProvide structured evidence for root cause analysis

Planner Behavior

When source = airflow, the planner:
  • Prioritizes Airflow-related actions
  • Seeds Airflow tools into the action space
However:
  • Tool selection is LLM-driven
  • Exact ordering may vary between runs
This design avoids hard-coded routing and keeps the system extensible.

Error Handling

  • Per-run failures are isolated — one failing request does not break the loop
  • Network/API errors are handled defensively
  • Partial evidence is preserved whenever possible

Testing

E2E Tests

python -m pytest tests/e2e/airflow/test_orchestrator.py -v
Expected output:
test_airflow_investigation_e2e PASSED

Routing Tests

python -m pytest tests/nodes/plan_actions/test_airflow_routing.py -v

Limitations

  • Planner routing is probabilistic (LLM-based)
  • Requires a reachable Airflow instance
  • No CI-backed Airflow instance by default (local validation required)

Design Notes

  • Integration follows the same contract as other sources (Datadog, Grafana, etc.)
  • Uses env-based configuration for simplicity
  • Avoids introducing hard overrides in planning logic
  • Focuses on evidence-driven investigation, not static rules

Future Work

  • Stronger tool routing guarantees
  • CI-backed disposable Airflow instance for e2e tests
  • Deeper DAG dependency analysis
  • Richer RCA explanations