Designing Forecasting Pipelines for Production
Rami Krispin
Senior Manager, Data Science and Engineering

import etl.eia_etl as eeimport etl.forecast_utils as fuimport etl.dags_functions as df
def check_updates_api(path, var):
    check = df.CheckUpdates()
    check.check_updates(path = path, 
                        var = var)
    return check.status









$$
$$




# Airflow modules from airflow import DAG from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.operators.python import BranchPythonOperator# Pipeline supporting modules import etl.eia_etl as ee import etl.callable as cl# Other required libraries from datetime import datetime, timedelta import json import pointblank as pb
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 3, 15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
with DAG( 'data_pipeline', default_args=default_args, description='Data pipeline for ETL process', schedule='@daily', tags = ["python", "etl", "forecast"] ) as dag:check_api = PythonOperator(task_id='check_api',python_callable=cl.check_updates_api,op_kwargs={"path": "./settings/settings.json", "var": "EIA_API_KEY"} )
check_status = BranchPythonOperator( task_id='check_status', python_callable=cl.update_status, provide_context=True)data_refresh = BranchPythonOperator( task_id='data_refresh', python_callable=cl.data_refresh, provide_context=True, op_kwargs={"var": api_key_var})no_updates = PythonOperator( task_id='no_updates', python_callable=cl.no_updates, provide_context=True, op_kwargs={"save": save})
check_api >> check_status >> [data_refresh, no_updates]
data_refresh >> [data_validation, data_failure] 
data_validation >> check_validation >> [data_valid, data_invalid]
data_valid >> forecast_refresh >> forecast_score

Designing Forecasting Pipelines for Production