Building Data Pipelines with Airflow
Volker Janz
Senior Developer Advocate at Astronomer

$$
$$

$$

from airflow.sdk import dag, task @dag def star_wars_dag():@task def get_star_wars_person(): import requests return requests.get("https://swapi.dev/api/people/1/").json()@task.bash def print_name(person): return f"echo '{person['name']}'"person = get_star_wars_person() print_name(person) star_wars_dag()
$$

$$

$$
# No automatic runs: trigger manually (default) @dag(schedule=None) def my_pipeline(): ...# Time-based: runs every day at 6 AM @dag(schedule="0 6 * * *") def daily_pipeline(): ...# Data-aware: runs when an Asset updates @dag(schedule=[Asset("my_asset")]) def downstream_pipeline(): ...
Classic operators
extract = PythonOperator(
task_id="extract",
python_callable=extract_fn)
extract >> transform
TaskFlow API
@task
def extract():
return {"users": 150}
data = extract()
transform(data)
$$

.py filespython3 filename.pydag.test() runs the full Dag in one processBuilding Data Pipelines with Airflow