Partition-aware scheduling with Asset Partitions

Building Data Pipelines with Airflow

Volker Janz

Senior Developer Advocate at Astronomer

When ds is not enough

 

  • {{ ds }} ties each run to a date, which works for time scheduled Dags
  • But asset-triggered Dags have ds set to None, it's simply unavailable
  • The downstream Dag has no way to know which date was just updated
  • Asset Partitions propagate a partition_key from upstream to downstream

Upstream scheduled run triggers downstream asset-scheduled run where ds is None

Building Data Pipelines with Airflow

CronPartitionTimetable

from airflow.sdk import dag, CronPartitionTimetable

@dag(schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"))
def sales_pipeline():
    ...

 

  • Every scheduled run gets a partition_key automatically
  • Manual runs are not partitioned unless key is provided
Building Data Pipelines with Airflow

Accessing partition keys

In Python tasks:

@task
def process_data(dag_run=None):
    partition_key = dag_run.partition_key
    print(f"Processing: {partition_key}")

In SQL templates:

DELETE FROM daily_summary
WHERE order_date = '{{ dag_run.partition_key[:10] }}';

 

  • Available in every task within a partitioned run
Building Data Pipelines with Airflow

Partitioned asset events

from airflow.sdk import dag, task, Asset, CronPartitionTimetable

daily_sales = Asset("daily_sales")

@dag(schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"))
def sales_pipeline():
    @task(outlets=[daily_sales])
    def load_data(**context):
        ...

 

  • Task outlet + CronPartitionTimetable = partitioned asset event
  • The event carries the same partition key as the Dag run
  • Signals which partition is ready, not just that data updated
Building Data Pipelines with Airflow

PartitionedAssetTimetable

from airflow.sdk import dag, task, Asset, PartitionedAssetTimetable

daily_sales = Asset("daily_sales")

@dag(schedule=PartitionedAssetTimetable(assets=daily_sales))
def sales_report():
    @task
    def generate_report(dag_run=None):
        partition_key = dag_run.partition_key
        print(f"Report for: {partition_key}")

 

  • Triggers only on partitioned asset events
  • Downstream run inherits the partition key
Building Data Pipelines with Airflow

StartOfDayMapper

from airflow.sdk import (
    dag, PartitionedAssetTimetable,
    StartOfDayMapper,
)

@dag(schedule=PartitionedAssetTimetable(
    assets=daily_sales,
    partition_mapper_config={
        daily_sales: StartOfDayMapper()
    },
))
def sales_report():
    ...

 

Before mapping: 2026-04-23T00:00:00

After StartOfDayMapper: 2026-04-23

$$

  • Temporal keys: StartOfHourMapper, StartOfWeekMapper, StartOfMonthMapper
  • Non-temporal keys: AllowedKeyMapper for regions, departments
Building Data Pipelines with Airflow

The full flow

Full partition flow

  • Upstream: CronPartitionTimetable attaches partition key to each run
  • Outlet task emits a partitioned asset event with that key
  • StartOfDayMapper normalizes the timestamp to a date
  • Downstream: PartitionedAssetTimetable triggers and inherits the mapped key
Building Data Pipelines with Airflow

Let's practice!

Building Data Pipelines with Airflow

Preparing Video For Download...