Dynamic Task Mapping

Building Data Pipelines with Airflow

Volker Janz

Senior Developer Advocate at Astronomer

The problem with hardcoded tasks

@task
def process_file_1():
    transform("/data/file_1.csv")
@task
def process_file_2():
    transform("/data/file_2.csv")
@task
def process_file_3():
    transform("/data/file_3.csv")

Dynamic Task Mapping example

Building Data Pipelines with Airflow

.expand()

@task
def fetch_files():
    return ["/data/file_1.csv", "/data/file_2.csv", "/data/file_3.csv"]

@task(max_active_tis_per_dagrun=2) # control max parallelism def process_file(path: str): print(f"processing file: {path}")
files = fetch_files() process_file.expand(path=files)

Scale at runtime

Building Data Pipelines with Airflow

Mapped tasks in the UI

 

  • Mapped instances appear as indexed instances under a single task
  • Click into each one for individual logs and status
  • Dag stays readable even with hundreds of instances

Mapped tasks in the UI

Building Data Pipelines with Airflow

.partial()

@task
def process_file(path: str, output_dir: str):
    transform(path, output_dir)

files = get_files() process_file.partial(output_dir="/out").expand(path=files)

Partial and expand

Building Data Pipelines with Airflow

The cross-product problem

files = get_files()           # ["/data/a.csv", "/data/b.csv", "/data/c.csv"]
destinations = get_targets()  # ["s3://out/a", "s3://out/b", "s3://out/c"]

# Cross product: 3 x 3 = 9 instances (not what we want!) process.expand(path=files, dest=destinations)

Zip lists

Building Data Pipelines with Airflow

.zip()

files = get_files()           # ["/data/a.csv", "/data/b.csv", "/data/c.csv"]
destinations = get_targets()  # ["s3://out/a", "s3://out/b", "s3://out/c"]

# Paired: 3 instances (a->a, b->b, c->c) process.expand_kwargs(files.zip(destinations))

 

  • zip() matches items one-to-one by position
  • expand_kwargs unpacks each pair into keyword arguments
Building Data Pipelines with Airflow

When to use what

 

Pattern Use case
expand() Map over a single list
partial() Fix shared parameters across instances
zip() Pair multiple lists by position

 

  • Help to build scalable Dags
  • When the data changes, the task count adjusts automatically at runtime
Building Data Pipelines with Airflow

Let's practice!

Building Data Pipelines with Airflow

Preparing Video For Download...