Monitoring, Alerting, and Callbacks

Introduction to Apache Airflow in Python

Mike Metzger

Data Engineer

Dag Lifecycle

 

  • Every Dag run and task goes through a sequence of states
    • Queued
    • Running
    • Final state: Success, Failed, Skipped
  • Airflow tracks states and transitions

Diagram of the Dag run lifecycle from queued to running to a final state

Introduction to Apache Airflow in Python

Callbacks

  • Functions on Dag run or Task that Airflow automatically calls
  • Callbacks for specific state transitions
    • on_failure_callback - Dag run / task fails
    • on_success_callback- Dag run / task succeeds
  • Task specific callbacks
    • on_retry_callback
    • on_skipped_callback
    • on_execute_callback

Illustration of Airflow callbacks triggered on state transitions

Introduction to Apache Airflow in Python

Callback context

  • Airflow passes a context dictionary automatically
  • context contains info about the Dag run / task
    • context["dag"].dag_id - Name of Dag
    • context["task_instance"].task_id - Name of task
    • context["logical_date"] - Date of the Dag run
  • Callback function must accept context
def alert_on_failure(context):

dag_id = context["dag"].dag_id task_id = context["task_instance"].task_id print(f"Task {task_id} in DAG {dag_id} has failed.")
Introduction to Apache Airflow in Python

Callback example

def alert_on_failure(context):
    dag_id = context["dag"].dag_id
    task_id = context["task_instance"].task_id
    print(f"Task {task_id} in Dag {dag_id} has failed.")

@dag(on_failure_callback=alert_on_failure) def sales_etl_dag(): @task() def data_import_task(): raise ValueError("Simulated failure")
# Task data_import_task in Dag sales_etl_dag has failed.
Introduction to Apache Airflow in Python

Notifiers

  • Functions that can be tied to callbacks
  • Send alerts to external systems
  • Several notifiers available:
    • SmtpNotifier - Sends email alerts
    • SlackNotifier - Posts messages to a Slack channel
    • App specific notifiers - PagerDuty, OpsGenie

Illustration of Airflow notifiers sending alerts to external systems

Introduction to Apache Airflow in Python

SmtpNotifier

  • Sends an email on callback
  • In the airflow.providers.smtp.notifications.smtp library
  • Requires from_email and to attributes
  • Can include subject, html_content, etc
    @dag(dag_id=`sales_etl_dag`,
       on_failure_callback=SmtpNotifier(
           to='[email protected]',
           from_email='[email protected]',
           subject='Dag sales_etl_dag has failed'
       )
    )
    
Introduction to Apache Airflow in Python

Audit Log

  • Use the Audit Log to monitor Airflow
  • Contains sequence of all events on Airflow instance

Airflow audit log page listing system events by type

Introduction to Apache Airflow in Python

Let's practice!

Introduction to Apache Airflow in Python

Preparing Video For Download...