Robust Dags with failure handling and retries

Building Data Pipelines with Airflow

Volker Janz

Senior Developer Advocate at Astronomer

Why tasks fail

 

Task connection issues

 

  • API timeouts and rate limits
  • Brief network interruptions and domain name resolution delays
  • Multiple tasks competing for the same resource
Building Data Pipelines with Airflow

Retries

@task(
    retries=3,
    retry_delay=timedelta(minutes=2),
)
def fetch_weather():
    response = requests.get("https://api.weather.com/forecast")
    response.raise_for_status()
    return response.json()

 

  • Task retries up to 3 times on failure
  • Waits 2 minutes between each attempt
Building Data Pipelines with Airflow

Exponential backoff

 

@task(
    retries=3,
    retry_delay=timedelta(minutes=2),
    retry_exponential_backoff=2.0,
)
def fetch_weather():
    ...

Exponential backoff

 

  • Delay doubles after each retry
  • Gives external services time to recover
Building Data Pipelines with Airflow

on_failure_callback

def alert_on_failure(context):
    dag_id = context["dag"].dag_id
    task_id = context["ti"].task_id
    print(f"ALERT: {task_id} in {dag_id} failed!")

@task(on_failure_callback=alert_on_failure)
def fetch_weather():
    ...

 

  • context dict: dag, ti, exception, log URL
  • Route to Slack, PagerDuty, or any alerting tool
Building Data Pipelines with Airflow

Callbacks at Dag vs task level

Dag-level (catch-all)

@dag(
    on_failure_callback=alert_team,
)
def my_pipeline():
    ...

Task-level (specific)

@task(
    on_failure_callback=page_oncall,
)
def critical_step():
    ...

 

  • Task-level overrides Dag-level
  • Use both for layered alerting
Building Data Pipelines with Airflow

max_consecutive_failed_dag_runs

@dag(max_consecutive_failed_dag_runs=3)
def monitoring_pipeline():
    ...

Failed Dag runs followed by auto-pause

  • Stops alert fatigue from persistent failures
  • Dag resumes when manually unpaused
Building Data Pipelines with Airflow

Putting it all together

@dag(
    max_consecutive_failed_dag_runs=3,
    on_failure_callback=alert_team,
)
def weather_pipeline():

    @task(
        retries=3,
        retry_delay=timedelta(minutes=2),
        retry_exponential_backoff=True,
        on_failure_callback=page_oncall,
    )
    def fetch_weather():
        ...
  • Retries + backoff handle transient failures
  • Callbacks alert the right people
  • Auto-pause stops the noise
Building Data Pipelines with Airflow

Let's practice!

Building Data Pipelines with Airflow

Preparing Video For Download...