Building Data Pipelines with Airflow
Volker Janz
Senior Developer Advocate at Astronomer

from airflow.providers.common.sql.operators.sql import (SQLExecuteQueryOperator, SQLColumnCheckOperator)check_columns = SQLColumnCheckOperator( task_id="check_columns", conn_id="duckdb_analytics", table="daily_summary", column_mapping={ "total_revenue": {"min": {"greater_than": 0}}, "total_orders": {"null_check": {"equal_to": 0}}, }, )
column_mapping={
"total_revenue": {"min": {"greater_than": 0}},
"total_orders": {"null_check": {"equal_to": 0}},
"order_date": {"distinct_check": {"greater_than": 0}},
}
min: validates the minimum value in a columnnull_check: counts null values (assert equal_to: 0 for no nulls)distinct_check: counts distinct valuesnull_check, unique_check, distinct_check, min, maxequal_to, greater_than, geq_to, less_than, leq_tofrom airflow.providers.common.sql.operators.sql import SQLTableCheckOperatorcheck_table = SQLTableCheckOperator( task_id="check_table", conn_id="duckdb_analytics", table="daily_summary", checks={ "row_count_check": { "check_statement": "COUNT(*) > 0", }, }, )
check_statement: any SQL expression that evaluates to true or false@dag(schedule="@daily", template_searchpath="/path/to/sql")
def sales_pipeline():
load = SQLExecuteQueryOperator(
task_id="load_daily_sales",
conn_id="duckdb_analytics",
sql="aggregate_sales.sql",
)
check_columns = SQLColumnCheckOperator(...)
check_table = SQLTableCheckOperator(...)
load >> check_columns >> check_table
SQLColumnCheckOperator
check_columns = SQLColumnCheckOperator(
task_id="check_columns",
conn_id="duckdb_analytics",
table="daily_summary",
column_mapping={
"total_revenue": {"min": {"greater_than": 0}},
"total_orders": {"null_check": {"equal_to": 0}},
},
)
SQLTableCheckOperator
check_table = SQLTableCheckOperator(
task_id="check_table",
conn_id="duckdb_analytics",
table="daily_summary",
checks={
"row_count_check": {
"check_statement": "COUNT(*) > 0",
},
},
)
$$

Building Data Pipelines with Airflow