Building Data Pipelines with Airflow
Volker Janz
Senior Developer Advocate at Astronomer

from airflow.sdk import dag, task, task_group @task_group( group_id="ingest_orders", default_args={"retries": 3}, )def process_orders(): @task def extract_orders(): return [{"id": 1, "amount": 99.99}] @task def transform_orders(orders): return [{"id": o["id"], "total": o["amount"] * 1.08} for o in orders] return transform_orders(extract_orders())
group_id sets a custom identifier, defaults to the function namedefault_args applies to all tasks in the group to avoid repeated configuration codeWithout task groups

With task groups

@task_group(group_display_name="Process All")
def process_all():
@task_group(group_display_name="Ingest Orders")
def orders():
return transform(extract())
@task_group(group_display_name="Process Returns")
def returns():
return transform(extract())
return {
"orders": orders(),
"returns": returns(),
}
group_display_name sets a human-readable label in the UI (even supports emojis)@task_group def process_source(source_name, source_path): @task def extract(): return read_data(source_path) @task def transform(data): return clean_data(data) return transform(extract())# Reuse the same pattern for different sources orders = process_source("orders", "/data/orders.csv") returns = process_source("returns", "/data/returns.csv") events = process_source("events", "/data/events.csv")

group_id for clear programmatic names, group_display_name for UI labelsdefault_args to share config like retries across all tasks in a groupBuilding Data Pipelines with Airflow