Building Data Pipelines with Airflow
Volker Janz
Senior Developer Advocate at Astronomer
$$
$$

$$
| Operator | Use case |
|---|---|
ApprovalOperator |
Binary approve/reject |
HITLBranchOperator |
Choose which tasks run next |
HITLOperator |
Custom options + form |
HITLEntryOperator |
Pure form input, no options |
$$
from airflow.providers.standard.operators.hitl import HITLEntryOperator from airflow.sdk import Paramreview = HITLEntryOperator( task_id="human_review", subject="Review AI draft response", body="{{ ti.xcom_pull(task_ids='generate_draft') }}", params={ "feedback": Param("", type="string"), "urgency": Param("p3", type="string"), }, )
@task def send_response(hitl_output): feedback = hitl_output["params_input"]["feedback"] urgency = hitl_output["params_input"]["urgency"] print(f"Sending response (urgency: {urgency}): {feedback}")draft = generate_draft() chain(draft, review) send_response(review.output)
params_inputchosen_optionsfrom airflow.providers.standard.operators.hitl import HITLBranchOperatorroute = HITLBranchOperator( task_id="route_complaint", subject="Route customer complaint", body="A customer reported an issue. Choose the responsible team.", options=["Billing Issue", "General Inquiry", "Technical Issue"], options_mapping={ "Billing Issue": "handle_billing", "Technical Issue": "handle_technical", "General Inquiry": "handle_general", }, defaults=["General Inquiry"], )

Building Data Pipelines with Airflow