Building Data Pipelines with Airflow
Volker Janz
Senior Developer Advocate at Astronomer
{{ ds }} ties each run to a date, which works for time scheduled Dagsds set to None, it's simply unavailablepartition_key from upstream to downstream
from airflow.sdk import dag, CronPartitionTimetable
@dag(schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"))
def sales_pipeline():
...
partition_key automaticallyIn Python tasks:
@task
def process_data(dag_run=None):
partition_key = dag_run.partition_key
print(f"Processing: {partition_key}")
In SQL templates:
DELETE FROM daily_summary
WHERE order_date = '{{ dag_run.partition_key[:10] }}';
from airflow.sdk import dag, task, Asset, CronPartitionTimetable
daily_sales = Asset("daily_sales")
@dag(schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"))
def sales_pipeline():
@task(outlets=[daily_sales])
def load_data(**context):
...
from airflow.sdk import dag, task, Asset, PartitionedAssetTimetable
daily_sales = Asset("daily_sales")
@dag(schedule=PartitionedAssetTimetable(assets=daily_sales))
def sales_report():
@task
def generate_report(dag_run=None):
partition_key = dag_run.partition_key
print(f"Report for: {partition_key}")
from airflow.sdk import (
dag, PartitionedAssetTimetable,
StartOfDayMapper,
)
@dag(schedule=PartitionedAssetTimetable(
assets=daily_sales,
partition_mapper_config={
daily_sales: StartOfDayMapper()
},
))
def sales_report():
...
Before mapping:
2026-04-23T00:00:00
After StartOfDayMapper:
2026-04-23
$$
StartOfHourMapper, StartOfWeekMapper, StartOfMonthMapperAllowedKeyMapper for regions, departments
CronPartitionTimetable attaches partition key to each runStartOfDayMapper normalizes the timestamp to a datePartitionedAssetTimetable triggers and inherits the mapped keyBuilding Data Pipelines with Airflow