Trigger e tolleranza ai guasti

Introduzione ad Apache Airflow in Python

Mike Metzger

Data Engineer

Creare pipeline robuste

  • Trigger: controllano quando possono partire i task
    • Definiscono task che eseguono sotto condizioni specifiche
    • Trigger predefinito: tutti con successo

Illustrazione dei trigger che controllano quando possono essere eseguiti i task

  • Tolleranza ai guasti: abilita il ripristino all'interno di un task
    • Ritenta il singolo task

Illustrazione della tolleranza ai guasti che ritenta un task fallito

Introduzione ad Apache Airflow in Python

Trigger rule

  • Controlla lo stato dei task precedenti prima che un task inizi
  • Ti permette di decidere se i task devono procedere
  • Per impostazione predefinita, tutti i task precedenti devono finire con successo prima di continuare
  • Le trigger rule permettono di cambiare questo comportamento

Illustrazione delle trigger rule che controllano gli stati dei task a monte

Introduzione ad Apache Airflow in Python

Usi delle trigger rule

  • Task di notifica
  • Task di cleanup
  • Esecuzione condizionale

Illustrazione di casi d'uso delle trigger rule come task di notifica e cleanup

Introduzione ad Apache Airflow in Python

Trigger rule principali

  • all_success - Tutti i task precedenti sono riusciti, nessuno saltato
  • all_failed - Tutti i task precedenti sono falliti
  • all_done - Tutti i task precedenti sono completati, indipendentemente dall'esito
  • one_failed - Almeno un task è fallito
  • one_success - Almeno un task è riuscito
  • none_failed - Tutti i task precedenti riusciti o saltati
Introduzione ad Apache Airflow in Python

Implementazione della trigger rule

  • from airflow.utils.trigger_rule import TriggerRule
  • Le regole di trigger si applicano al decorator @task
  • trigger_rule=TriggerRule.<TriggerRule enum>
@task(trigger_rule=TriggerRule.ALL_SUCCESS)
def run_if_everything_succeeds:
  print('All previous tasks succeeded!')
Introduzione ad Apache Airflow in Python

Attributi di tolleranza ai guasti

  • retries: parametro che definisce quante volte Airflow ritenta un task fallito prima di segnarlo come fallito
  • retry_delay: parametro che accetta un timedelta e definisce l'attesa tra i tentativi
@task(
  trigger_rule=TriggerRule.ONE_SUCCESS,
  retries=3,
  retry_delay=timedelta(minutes=5)
)
Introduzione ad Apache Airflow in Python

TriggerDagRunOperator

  • Permette a un DAG di avviarne un altro
  • Fornisce codice e DAG
  • from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
  • trigger_dag_id - Deve corrispondere a dag_id
  • wait_for_completion - Blocca i task successivi finché non termina
  • poke_interval - Frequenza di controllo
  • conf - Dati da passare al DAG figlio
Introduzione ad Apache Airflow in Python

Esempio di TriggerDagRun

trigger_child = TriggerDagRunOperator(
        task_id="trigger_child_pipeline",
        trigger_dag_id="child_pipeline_dag",   
        wait_for_completion=True,              
        poke_interval=30,                      
        conf={                                 
            "source": "s3://my-bucket/raw/",
        },
    )

task1 >> trigger_child    # Esegui il primo task, poi avvia il DAG figlio
trigger_child >> cleanup  # Esegui cleanup dopo il completamento del DAG figlio
Introduzione ad Apache Airflow in Python

Passiamo alla pratica !

Introduzione ad Apache Airflow in Python

Preparing Video For Download...