Data quality checks

Building Data Pipelines with Airflow

Volker Janz

Senior Developer Advocate at Astronomer

Why quality gates matter

 

  • A pipeline without checks is a pipeline you can't trust
  • One upstream change introduces nulls or negative values
  • Every downstream dashboard shows wrong numbers
  • Quality gates catch problems at the source

Bad data flowing downstream

Building Data Pipelines with Airflow

SQLColumnCheckOperator

from airflow.providers.common.sql.operators.sql import
    (SQLExecuteQueryOperator, SQLColumnCheckOperator)

check_columns = SQLColumnCheckOperator( task_id="check_columns", conn_id="duckdb_analytics", table="daily_summary", column_mapping={ "total_revenue": {"min": {"greater_than": 0}}, "total_orders": {"null_check": {"equal_to": 0}}, }, )
Building Data Pipelines with Airflow

Column check types

column_mapping={
    "total_revenue": {"min": {"greater_than": 0}},
    "total_orders": {"null_check": {"equal_to": 0}},
    "order_date": {"distinct_check": {"greater_than": 0}},
}
  • min: validates the minimum value in a column
  • null_check: counts null values (assert equal_to: 0 for no nulls)
  • distinct_check: counts distinct values
  • Available checks: null_check, unique_check, distinct_check, min, max
  • Available comparators: equal_to, greater_than, geq_to, less_than, leq_to
Building Data Pipelines with Airflow

SQLTableCheckOperator

from airflow.providers.common.sql.operators.sql import SQLTableCheckOperator

check_table = SQLTableCheckOperator( task_id="check_table", conn_id="duckdb_analytics", table="daily_summary", checks={ "row_count_check": { "check_statement": "COUNT(*) > 0", }, }, )
  • Validates properties of the entire table
  • check_statement: any SQL expression that evaluates to true or false
Building Data Pipelines with Airflow

Chaining quality checks

@dag(schedule="@daily", template_searchpath="/path/to/sql")
def sales_pipeline():

    load = SQLExecuteQueryOperator(
        task_id="load_daily_sales",
        conn_id="duckdb_analytics",
        sql="aggregate_sales.sql",
    )

    check_columns = SQLColumnCheckOperator(...)
    check_table = SQLTableCheckOperator(...)

    load >> check_columns >> check_table
  • Load first, then validate
  • If any check fails, the pipeline stops
  • Downstream consumers never see bad data
Building Data Pipelines with Airflow

Column vs table checks

SQLColumnCheckOperator

check_columns = SQLColumnCheckOperator(
    task_id="check_columns",
    conn_id="duckdb_analytics",
    table="daily_summary",
    column_mapping={
        "total_revenue": {"min": {"greater_than": 0}},
        "total_orders": {"null_check": {"equal_to": 0}},
    },
)
  • Are there nulls?
  • Are values within range?

SQLTableCheckOperator

check_table = SQLTableCheckOperator(
    task_id="check_table",
    conn_id="duckdb_analytics",
    table="daily_summary",
    checks={
        "row_count_check": {
            "check_statement": "COUNT(*) > 0",
        },
    },
)
  • Does the table have rows?
  • Is the total within expected bounds?
Building Data Pipelines with Airflow

Monitoring quality at scale with Astro

$$

  • Astronomer's Astro Observe helps to monitor pipeline health
  • SLA tracking with alerts before deadlines
  • Pipeline lineage across Dags and tables
  • Data quality monitoring at scale
  • AI-based root cause analysis

Astro Observe lineage

Building Data Pipelines with Airflow

Let's practice!

Building Data Pipelines with Airflow

Preparing Video For Download...