Triggers and Fault Tolerance

Introduction to Apache Airflow in Python

Mike Metzger

Data Engineer

Building robust pipelines

  • Triggers - control when tasks can run
    • Define tasks that execute under specific conditions
    • Default trigger - all success

Illustration of triggers controlling when tasks can run

  • Fault tolerance - enables recovery within a task
    • Retry individual task

Illustration of fault tolerance retrying a failed task

Introduction to Apache Airflow in Python

Trigger rules

  • Check the state previous tasks before a task starts
  • Lets you define if tasks should proceed
  • By default, all previous tasks must finish successfully before continuing
  • Trigger rules let you change that

Illustration of trigger rules checking upstream task states

Introduction to Apache Airflow in Python

Trigger rules uses

  • Notification tasks
  • Cleanup tasks
  • Conditional execution

Illustration of trigger rule use cases such as notification and cleanup tasks

Introduction to Apache Airflow in Python

Key trigger rules

  • all_success - Every prior task finished successfully, no skips
  • all_failed - Every prior task failed
  • all_done - Every prior task completed, regardless of outcome
  • one_failed - At least one task failed
  • one_success - At least one task succeeded
  • none_failed - All prior tasks finished successfully or skipped
Introduction to Apache Airflow in Python

Trigger rule implementation

  • from airflow.utils.trigger_rule import TriggerRule
  • Trigger rules are applied to the @task decorator
  • trigger_rule=TriggerRule.<TriggerRule enum>
@task(trigger_rule=TriggerRule.ALL_SUCCESS)
def run_if_everything_succeeds:
  print('All previous tasks succeeded!')
Introduction to Apache Airflow in Python

Fault tolerance attributes

  • retries - task parameter defining how many times Airflow should retry a failed task before marking it as failed
  • retry_delay - task parameter accepting a timedelta defining the wait time between retry attempts
@task(
  trigger_rule=TriggerRule.ONE_SUCCESS,
  retries=3,
  retry_delay=timedelta(minutes=5)
)
Introduction to Apache Airflow in Python

TriggerDagRunOperator

  • Allows one Dag to kick off another Dag
  • Provides code and Dags
  • from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
  • trigger_dag_id - Must match dag_id
  • wait_for_completion - Block further tasks until complete
  • poke_interval - How often to check
  • conf - Data to pass to child Dag
Introduction to Apache Airflow in Python

TriggerDagRun Example

trigger_child = TriggerDagRunOperator(
        task_id="trigger_child_pipeline",
        trigger_dag_id="child_pipeline_dag",   
        wait_for_completion=True,              
        poke_interval=30,                      
        conf={                                 
            "source": "s3://my-bucket/raw/",
        },
    )

task1 >> trigger_child    # Run first task, then kick off child Dag
trigger_child >> cleanup  # Run a cleanup task after child Dag completes
Introduction to Apache Airflow in Python

Let's practice!

Introduction to Apache Airflow in Python

Preparing Video For Download...