Building Data Pipelines with Airflow
Volker Janz
Senior Developer Advocate at Astronomer

@task(
retries=3,
retry_delay=timedelta(minutes=2),
)
def fetch_weather():
response = requests.get("https://api.weather.com/forecast")
response.raise_for_status()
return response.json()
@task(
retries=3,
retry_delay=timedelta(minutes=2),
retry_exponential_backoff=2.0,
)
def fetch_weather():
...

def alert_on_failure(context):
dag_id = context["dag"].dag_id
task_id = context["ti"].task_id
print(f"ALERT: {task_id} in {dag_id} failed!")
@task(on_failure_callback=alert_on_failure)
def fetch_weather():
...
dag, ti, exception, log URLDag-level (catch-all)
@dag(
on_failure_callback=alert_team,
)
def my_pipeline():
...
Task-level (specific)
@task(
on_failure_callback=page_oncall,
)
def critical_step():
...
@dag(max_consecutive_failed_dag_runs=3)
def monitoring_pipeline():
...

@dag(
max_consecutive_failed_dag_runs=3,
on_failure_callback=alert_team,
)
def weather_pipeline():
@task(
retries=3,
retry_delay=timedelta(minutes=2),
retry_exponential_backoff=True,
on_failure_callback=page_oncall,
)
def fetch_weather():
...
Building Data Pipelines with Airflow