Building Data Pipelines with Airflow
Volker Janz
Senior Developer Advocate at Astronomer

xcom_push() and xcom_pull()
@task return value becomes an XComArg
@task def get_config(): return {"name": "daily_etl"} @task def run_pipeline(config): print(config["name"])config = get_config() # XComArgrun_pipeline(config) # dependency + data
get_timestamp = BashOperator( task_id="get_timestamp", bash_command="date +%Y-%m-%d") @task def log_timestamp(ts_value): print(f"Timestamp: {ts_value}")log_timestamp(get_timestamp.output)


AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD: only store XComs in the object storage if threshold is reachedBuilding Data Pipelines with Airflow