Airflow

Forecasting-pijplijnen ontwerpen voor productie

Rami Krispin

Senior Manager, Data Science and Engineering

Airflow-pipeline

Pipeline-DAG

Forecasting-pijplijnen ontwerpen voor productie

Hulpfuncties bouwen

Stappen:
  • Hulpfuncties bouwen
    • Ondersteunende modules
    • Aanroepbare functies
import etl.eia_etl as ee

import etl.forecast_utils as fu
import etl.dags_functions as df
def check_updates_api(path, var):
    check = df.CheckUpdates()
    check.check_updates(path = path, 
                        var = var)
    return check.status
Forecasting-pijplijnen ontwerpen voor productie

De pipeline instellen

Stappen

  • Hulpfuncties bouwen
    • Ondersteunende modules
    • Aanroepbare functies
  • De DAG instellen
  • De DAG testen
  • Productie-deploy

Airflow DAG

Airflow DAG-taak

Forecasting-pijplijnen ontwerpen voor productie

De DAG instellen

Airflow DAG

Forecasting-pijplijnen ontwerpen voor productie

De DAG testen

Airflow DAG-taak

Forecasting-pijplijnen ontwerpen voor productie

Een prototype bouwen

Best practices

  • Prototype

Prototype-notebook voor pipeline

Forecasting-pijplijnen ontwerpen voor productie

Stel de DAG-structuur in

Functies in Airflow-DAG

Forecasting-pijplijnen ontwerpen voor productie

Voeg de onderdelen toe

Airflow DAG

  • Maakt pipeline-testen mogelijk
Forecasting-pijplijnen ontwerpen voor productie

Scenario simuleren

Airflow DAG

Forecasting-pijplijnen ontwerpen voor productie

Duidelijke meldingen

Airflow-log

Forecasting-pijplijnen ontwerpen voor productie

Airflow-operators

$$

$$ Airflow Python-operator

Forecasting-pijplijnen ontwerpen voor productie

Airflow PythonOperator

  • PythonOperator
  • BranchPythonOperator

Airflow Python-operator

Forecasting-pijplijnen ontwerpen voor productie

Airflow BranchPythonOperator

  • PythonOperator
  • BranchPythonOperator

Airflow Python-operator

Forecasting-pijplijnen ontwerpen voor productie

Airflow BranchPythonOperator

  • PythonOperator
  • BranchPythonOperator

Airflow Python-operator

Forecasting-pijplijnen ontwerpen voor productie

Modules importeren

# Airflow modules
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator 
from airflow.providers.standard.operators.python import BranchPythonOperator

# Pipeline supporting modules import etl.eia_etl as ee import etl.callable as cl
# Other required libraries from datetime import datetime, timedelta import json import pointblank as pb
Forecasting-pijplijnen ontwerpen voor productie

Standaardargumenten voor de DAG instellen

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 3, 15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
Forecasting-pijplijnen ontwerpen voor productie

De DAG instellen

with DAG(
    'data_pipeline',
    default_args=default_args,
    description='Data pipeline for ETL process',
    schedule='@daily',
    tags = ["python", "etl", "forecast"]
) as dag:

check_api = PythonOperator(
task_id='check_api',
python_callable=cl.check_updates_api,
op_kwargs={"path": "./settings/settings.json", "var": "EIA_API_KEY"} )
Forecasting-pijplijnen ontwerpen voor productie

De DAG instellen

check_status = BranchPythonOperator(
  task_id='check_status',
  python_callable=cl.update_status,
  provide_context=True)

data_refresh = BranchPythonOperator( task_id='data_refresh', python_callable=cl.data_refresh, provide_context=True, op_kwargs={"var": api_key_var})
no_updates = PythonOperator( task_id='no_updates', python_callable=cl.no_updates, provide_context=True, op_kwargs={"save": save})
Forecasting-pijplijnen ontwerpen voor productie

De DAG instellen

check_api >> check_status >> [data_refresh, no_updates]
data_refresh >> [data_validation, data_failure] 
data_validation >> check_validation >> [data_valid, data_invalid]
data_valid >> forecast_refresh >> forecast_score
Forecasting-pijplijnen ontwerpen voor productie

De DAG instellen

Airflow-UI

Forecasting-pijplijnen ontwerpen voor productie

Laten we oefenen!

Forecasting-pijplijnen ontwerpen voor productie

Preparing Video For Download...