Introduction to Apache Airflow in Python
Mike Metzger
Data Engineer

on_failure_callback - Dag run / task failson_success_callback- Dag run / task succeedson_retry_callbackon_skipped_callbackon_execute_callback
context dictionary automaticallycontext contains info about the Dag run / taskcontext["dag"].dag_id - Name of Dagcontext["task_instance"].task_id - Name of taskcontext["logical_date"] - Date of the Dag runcontextdef alert_on_failure(context):dag_id = context["dag"].dag_id task_id = context["task_instance"].task_id print(f"Task {task_id} in DAG {dag_id} has failed.")
def alert_on_failure(context): dag_id = context["dag"].dag_id task_id = context["task_instance"].task_id print(f"Task {task_id} in Dag {dag_id} has failed.")@dag(on_failure_callback=alert_on_failure) def sales_etl_dag(): @task() def data_import_task(): raise ValueError("Simulated failure")# Task data_import_task in Dag sales_etl_dag has failed.
SmtpNotifier - Sends email alertsSlackNotifier - Posts messages to a Slack channel
airflow.providers.smtp.notifications.smtp libraryfrom_email and to attributessubject, html_content, etc@dag(dag_id=`sales_etl_dag`,
on_failure_callback=SmtpNotifier(
to='[email protected]',
from_email='[email protected]',
subject='Dag sales_etl_dag has failed'
)
)

Introduction to Apache Airflow in Python