Introduzione ad Apache Airflow in Python
Mike Metzger
Data Engineer




all_success - Tutti i task precedenti sono riusciti, nessuno saltatoall_failed - Tutti i task precedenti sono fallitiall_done - Tutti i task precedenti sono completati, indipendentemente dall'esitoone_failed - Almeno un task è fallitoone_success - Almeno un task è riuscitonone_failed - Tutti i task precedenti riusciti o saltatifrom airflow.utils.trigger_rule import TriggerRule@tasktrigger_rule=TriggerRule.<TriggerRule enum>@task(trigger_rule=TriggerRule.ALL_SUCCESS)
def run_if_everything_succeeds:
print('All previous tasks succeeded!')
retries: parametro che definisce quante volte Airflow ritenta un task fallito prima di segnarlo come fallitoretry_delay: parametro che accetta un timedelta e definisce l'attesa tra i tentativi@task(
trigger_rule=TriggerRule.ONE_SUCCESS,
retries=3,
retry_delay=timedelta(minutes=5)
)
from airflow.providers.standard.operators.trigger_dagrun
import TriggerDagRunOperatortrigger_dag_id - Deve corrispondere a dag_idwait_for_completion - Blocca i task successivi finché non terminapoke_interval - Frequenza di controlloconf - Dati da passare al DAG figliotrigger_child = TriggerDagRunOperator(
task_id="trigger_child_pipeline",
trigger_dag_id="child_pipeline_dag",
wait_for_completion=True,
poke_interval=30,
conf={
"source": "s3://my-bucket/raw/",
},
)
task1 >> trigger_child # Esegui il primo task, poi avvia il DAG figlio
trigger_child >> cleanup # Esegui cleanup dopo il completamento del DAG figlio
Introduzione ad Apache Airflow in Python