Airflow

Designing Forecasting Pipelines for Production

Rami Krispin

Senior Manager, Data Science and Engineering

Airflow pipeline

Pipeline DAG

Designing Forecasting Pipelines for Production

Build supporting functions

Steps:
  • Build supporting functions
    • Supporting modules
    • Callable functions
import etl.eia_etl as ee

import etl.forecast_utils as fu
import etl.dags_functions as df
def check_updates_api(path, var):
    check = df.CheckUpdates()
    check.check_updates(path = path, 
                        var = var)
    return check.status
Designing Forecasting Pipelines for Production

Setting the pipeline

Steps

  • Build supporting functions
    • Supporting modules
    • Callable functions
  • Set the DAG
  • Test the DAG
  • Deploy to production

Airflow DAG

Airflow DAG Task

Designing Forecasting Pipelines for Production

Setting the DAG

Airflow DAG

Designing Forecasting Pipelines for Production

Testing the DAG

Airflow DAG Task

Designing Forecasting Pipelines for Production

Building a prototype

Best practices

  • Prototype

Pipeline Prototype Notebook

Designing Forecasting Pipelines for Production

Set the DAG structure

Functions in Airflow DAG

Designing Forecasting Pipelines for Production

Add the components

Airflow DAG

  • Allows pipeline testing
Designing Forecasting Pipelines for Production

Simulate scenario

Airflow DAG

Designing Forecasting Pipelines for Production

Clear messages

Airflow Log

Designing Forecasting Pipelines for Production

Airflow operators

$$

$$ Airflow Python Operator

Designing Forecasting Pipelines for Production

Airflow PythonOperator

  • Python Operator
  • Branch Python Operator

Airflow Python Operator

Designing Forecasting Pipelines for Production

Airflow BranchPythonOperator

  • Python Operator
  • Branch Python Operator

Airflow Python Operator

Designing Forecasting Pipelines for Production

Airflow BranchPythonOperator

  • Python Operator
  • Branch Python Operator

Airflow Python Operator

Designing Forecasting Pipelines for Production

Importing modules

# Airflow modules
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator 
from airflow.providers.standard.operators.python import BranchPythonOperator

# Pipeline supporting modules import etl.eia_etl as ee import etl.callable as cl
# Other required libraries from datetime import datetime, timedelta import json import pointblank as pb
Designing Forecasting Pipelines for Production

Setting DAG default arguments

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 3, 15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
Designing Forecasting Pipelines for Production

Setting the DAG

with DAG(
    'data_pipeline',
    default_args=default_args,
    description='Data pipeline for ETL process',
    schedule='@daily',
    tags = ["python", "etl", "forecast"]
) as dag:

check_api = PythonOperator(
task_id='check_api',
python_callable=cl.check_updates_api,
op_kwargs={"path": "./settings/settings.json", "var": "EIA_API_KEY"} )
Designing Forecasting Pipelines for Production

Setting the DAG

check_status = BranchPythonOperator(
  task_id='check_status',
  python_callable=cl.update_status,
  provide_context=True)

data_refresh = BranchPythonOperator( task_id='data_refresh', python_callable=cl.data_refresh, provide_context=True, op_kwargs={"var": api_key_var})
no_updates = PythonOperator( task_id='no_updates', python_callable=cl.no_updates, provide_context=True, op_kwargs={"save": save})
Designing Forecasting Pipelines for Production

Setting the DAG

check_api >> check_status >> [data_refresh, no_updates]
data_refresh >> [data_validation, data_failure] 
data_validation >> check_validation >> [data_valid, data_invalid]
data_valid >> forecast_refresh >> forecast_score
Designing Forecasting Pipelines for Production

Setting the DAG

Airflow UI

Designing Forecasting Pipelines for Production

Let's practice!

Designing Forecasting Pipelines for Production

Preparing Video For Download...