Testing Airflow code

Building Data Pipelines with Airflow

Volker Janz

Senior Developer Advocate at Astronomer

Three levels of testing

Dag testing pyramid

  • Integrity: Does the Dag load without errors?
  • Unit: Does the business logic produce correct results?
  • Integration: Does the full Dag run end-to-end?
Building Data Pipelines with Airflow

Integrity tests with DagBag

from airflow.models import DagBag

dag_bag = DagBag(include_examples=False)
def test_no_import_errors(): assert len(dag_bag.import_errors) == 0
def test_dag_loaded(): assert "daily_etl" in dag_bag.dags
Building Data Pipelines with Airflow

Why Dags break on import

ModuleNotFoundError

  • ModuleNotFoundError: missing provider package or wrong import path
  • NameError: renamed variable, function, or typo
  • ImportError: circular imports between Dag files
  • Integrity tests help to avoid these issues
Building Data Pipelines with Airflow

Unit testing task functions

In the Dag file:

def clean_record(record):
    return {
        "name": record["name"].strip(),
        "email": record["email"].lower(),
    }

@task
def transform(records):
    return [clean_record(r)
            for r in records]
  • Extract business logic

In the test file:

from dags.data_cleaning import (
    clean_record,
)

def test_strips_whitespace():
    result = clean_record(
      {"name": "  Alice  ",
       "email": "[email protected]"}
    )
    assert result["name"] == "Alice"
  • Focus unit tests on business logic
Building Data Pipelines with Airflow

Integration tests with dag.test()

import pytest
from airflow.models import DagBag
from pendulum import datetime

dag_bag = DagBag(include_examples=False)

def test_etl_pipeline(): dag = dag_bag.get_dag("etl_output") assert dag is not None dag.test(logical_date=datetime(2026, 1, 15)) output = Path("/tmp/etl_results.json") assert output.exists() results = json.loads(output.read_text()) assert len(results) == 2
  • Run the actual Dag with controlled input and validate the output
Building Data Pipelines with Airflow

Testing in CI

CI pipeline

  • Integrity + unit: every commit (fast)
  • Integration: pull requests or nightly (slower)
  • No Dag reaches production without passing all three
Building Data Pipelines with Airflow

Let's practice!

Building Data Pipelines with Airflow

Preparing Video For Download...