Data-aware scheduling with Assets

Building Data Pipelines with Airflow

Volker Janz

Senior Developer Advocate at Astronomer

The timing problem

 

  • Pipeline A loads raw data from an API
  • Pipeline B builds a dashboard from that data
  • If both run on cron, B might run before A finishes
  • Adding a time buffer is fragile

The timing problem

Building Data Pipelines with Airflow

What is an Asset?

from airflow.sdk import Asset
sales_data = Asset("s3://bucket/sales/daily.parquet")
  • A reference to a piece of data, identified by a unique name
  • A URI can be attached to the asset when it represents a concrete data entity
  • Could be a file, a database table, or any data
Building Data Pipelines with Airflow

Producers and consumers

  • Tasks can update assets, which creates an asset event
  • Dags can be triggered on asset updates with an asset schedule

 

Asset flow

Building Data Pipelines with Airflow

Producer: signaling with outlets

sales_data = Asset("s3://bucket/sales/daily.parquet")

@task(outlets=[sales_data]) def write_sales(): # Write data to S3 ...

 

  • On success, Airflow records that the asset was updated
  • The producer does not need to know about any consumer
Building Data Pipelines with Airflow

Consumer: scheduling on an asset

sales_data = Asset("s3://bucket/sales/daily.parquet")

@dag(schedule=[sales_data]) def build_dashboard(): ...

 

  • No cron expression, just the asset reference
  • Airflow triggers the consumer automatically after the update
Building Data Pipelines with Airflow

Conditional scheduling

sales = Asset("s3://bucket/sales.parquet")
inventory = Asset("s3://bucket/inventory.parquet")

# Wait for BOTH assets to update @dag(schedule=(sales & inventory)) def full_report(): ...
# Trigger when ANY asset updates @dag(schedule=(sales | inventory)) def quick_refresh(): ...
Building Data Pipelines with Airflow

Verifying assets with the CLI

$ airflow assets list
name                                  | group    | uri
s3://data-lake/sales/daily.csv        | asset    | s3://data-lake/sales/daily.csv
$ airflow assets details --name "s3://data-lake/sales/daily.csv"
property_name    | property_value
=================+=====================                                         
name             | s3://data-lake/sales/daily.csv
                ...
updated_at       | 2026-04-14T10:00:50.820041Z
  • airflow assets list shows all registered assets
  • airflow assets details shows metadata including updated_at
Building Data Pipelines with Airflow

Let's practice!

Building Data Pipelines with Airflow

Preparing Video For Download...