Building Data Pipelines with Airflow
Volker Janz
Senior Developer Advocate at Astronomer

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperatorSQLExecuteQueryOperator( task_id="load_sales", conn_id="duckdb_analytics", sql="INSERT INTO sales SELECT * FROM staging", )
conn_id in operators
from airflow.sdk import dag, task
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
@dag(schedule="@daily")
def sales_pipeline():
aggregate = SQLExecuteQueryOperator(
task_id="aggregate_daily_sales",
conn_id="duckdb_analytics",
sql="""
INSERT INTO daily_summary (order_date, total_orders, total_revenue)
SELECT order_date, COUNT(*), SUM(revenue)
FROM raw_orders GROUP BY order_date
""",
)
@dag(
schedule="@daily",
template_searchpath="/path/to/include/sql",
)
def sales_pipeline():
aggregate = SQLExecuteQueryOperator(
task_id="aggregate_daily_sales",
conn_id="duckdb_analytics",
sql="aggregate_sales.sql",
)
template_searchpath on @dagsqldags/ folder 💡
DELETE FROM daily_summary WHERE order_date = '{{ ds }}';
INSERT INTO daily_summary (order_date, total_orders, total_revenue)
SELECT order_date, COUNT(*), SUM(revenue)
FROM raw_orders
WHERE order_date = '{{ ds }}'
GROUP BY order_date;
{{ ds }} renders the logical date (YYYY-MM-DD)params (Jinja rendering)
SQLExecuteQueryOperator(
sql="""SELECT * FROM orders
WHERE product = '{{ params.product }}'""",
params={"product": user_input},
)
parameters (DB-level binding)
SQLExecuteQueryOperator(
sql="SELECT * FROM orders
WHERE product = $product",
parameters={"product": user_input},
)
$$
$$
$$

Building Data Pipelines with Airflow