Building Data Pipelines with Airflow
Volker Janz
Senior Developer Advocate at Astronomer
@task.bash
def export_data():
return "cp /data/sales.csv /export/sales_{{ ds }}.csv"
{{ ds }} renders the logical date in YYYY-MM-DD formatbash_command, sql)
Run 1 (March 31st):
Run 2 (re-run March 31st):

staged_rows = SQLExecuteQueryOperator( task_id="get_staged_rows", conn_id="duckdb_default", sql="SELECT * FROM staging WHERE date = '{{ ds }}'", )load = SQLInsertRowsOperator( task_id="load_sales", conn_id="duckdb_default", table_name="sales", columns=["date", "product", "amount"], preoperator="DELETE FROM sales WHERE date = '{{ ds }}';", rows=staged_rows.output, )
$$
UPSERT or MERGE approaches


Building Data Pipelines with Airflow