Operadores de Airflow

Introducción a Apache Airflow en Python

Mike Metzger

Data Engineer

Operadores

  • Representan una sola tarea en un flujo de trabajo
  • Se ejecutan de forma independiente (normalmente)
  • Generalmente no comparten información
  • Hay varios operadores para distintas tareas
@dag(
  dag_id="Example_Dag"
)
def example_dag():
  @task
  def task1():
    return "The result from task1"

  task1()

example_dag()
Introducción a Apache Airflow en Python

@task (PythonOperator)

  • Ejecuta una función de Python
  • Cualquier función de Python puede decorarse con @task
  • Puede pasar datos entre funciones/tareas
from airflow.sdk import task

@task def printme(): print("This goes in the logs!")
printme()
Introducción a Apache Airflow en Python

Argumentos de @task

  • Puedes pasar argumentos a la tarea/función como en cualquier función de Python
@task
def printme(name: str):
    print(f"Hi {name} - This goes in the logs!")

printme(name='DataCamp')
# Añade: # Hi DataCamp - This goes in the logs! # a los logs de Airflow
Introducción a Apache Airflow en Python

@task.bash (BashOperator)

@task.bash
def bash_example():
  return "echo 'Example!'"

bash_example()
@task.bash
def run_cleanup():
  return "runcleanup.sh"

run_cleanup()
  • Ejecuta un comando o script de Bash
  • Ejecuta el comando en un directorio temporal
  • Permite definir variables de entorno para el comando
Introducción a Apache Airflow en Python

Dependencias entre tareas

  • Cada DAG tiene un conjunto de tareas
  • Las dependencias marcan el orden de ejecución
  • Hay varios métodos para definir dependencias

 

DAG de Airflow con tareas conectadas mostrando su orden de dependencia

Introducción a Apache Airflow en Python

Sintaxis bitshift

>> y <<

task1() >> task2()

# task1 termina antes de que empiece task2
task1() >> task2() >> task3()
# task1 termina antes de task2 y task2 antes de task3
task1() >> task3() task2() >> task3()
# task1 y task2 pueden correr a la vez, pero ambas deben terminar antes de task3
Introducción a Apache Airflow en Python

Ejemplo de sintaxis bitshift

# Descargar ventas antes de conciliar
download_sales_data() >> reconcile()

# Descargar inventario antes de conciliar
download_inventory_data() >> reconcile()

 

  • Ambas deben terminar antes de conciliar; el orden no se especifica
Introducción a Apache Airflow en Python

¡Vamos a practicar!

Introducción a Apache Airflow en Python

Preparing Video For Download...