Airflow

Progettare pipeline di forecasting per la produzione

Rami Krispin

Senior Manager, Data Science and Engineering

Pipeline Airflow

DAG della pipeline

Progettare pipeline di forecasting per la produzione

Crea funzioni di supporto

Passi:
  • Crea funzioni di supporto
    • Moduli di supporto
    • Funzioni richiamabili
import etl.eia_etl as ee

import etl.forecast_utils as fu
import etl.dags_functions as df
def check_updates_api(path, var):
    check = df.CheckUpdates()
    check.check_updates(path = path, 
                        var = var)
    return check.status
Progettare pipeline di forecasting per la produzione

Impostare la pipeline

Passi

  • Crea funzioni di supporto
    • Moduli di supporto
    • Funzioni richiamabili
  • Imposta il DAG
  • Testa il DAG
  • Distribuisci in produzione

DAG di Airflow

Attività DAG di Airflow

Progettare pipeline di forecasting per la produzione

Impostare il DAG

DAG di Airflow

Progettare pipeline di forecasting per la produzione

Test del DAG

Attività DAG di Airflow

Progettare pipeline di forecasting per la produzione

Creare un prototipo

Buone pratiche

  • Prototipare

Notebook del prototipo di pipeline

Progettare pipeline di forecasting per la produzione

Definire la struttura del DAG

Funzioni nel DAG di Airflow

Progettare pipeline di forecasting per la produzione

Aggiungere i componenti

DAG di Airflow

  • Consente di testare la pipeline
Progettare pipeline di forecasting per la produzione

Simulare uno scenario

DAG di Airflow

Progettare pipeline di forecasting per la produzione

Messaggi chiari

Log di Airflow

Progettare pipeline di forecasting per la produzione

Operatori Airflow

$$

$$ Operatore Python di Airflow

Progettare pipeline di forecasting per la produzione

Airflow PythonOperator

  • PythonOperator
  • BranchPythonOperator

Operatore Python di Airflow

Progettare pipeline di forecasting per la produzione

Airflow BranchPythonOperator

  • PythonOperator
  • BranchPythonOperator

Operatore Python di Airflow

Progettare pipeline di forecasting per la produzione

Airflow BranchPythonOperator

  • PythonOperator
  • BranchPythonOperator

Operatore Python di Airflow

Progettare pipeline di forecasting per la produzione

Importare i moduli

# Airflow modules
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator 
from airflow.providers.standard.operators.python import BranchPythonOperator

# Pipeline supporting modules import etl.eia_etl as ee import etl.callable as cl
# Other required libraries from datetime import datetime, timedelta import json import pointblank as pb
Progettare pipeline di forecasting per la produzione

Impostare gli argomenti di default del DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 3, 15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
Progettare pipeline di forecasting per la produzione

Impostare il DAG

with DAG(
    'data_pipeline',
    default_args=default_args,
    description='Data pipeline per ETL',
    schedule='@daily',
    tags = ["python", "etl", "forecast"]
) as dag:

check_api = PythonOperator(
task_id='check_api',
python_callable=cl.check_updates_api,
op_kwargs={"path": "./settings/settings.json", "var": "EIA_API_KEY"} )
Progettare pipeline di forecasting per la produzione

Impostare il DAG

check_status = BranchPythonOperator(
  task_id='check_status',
  python_callable=cl.update_status,
  provide_context=True)

data_refresh = BranchPythonOperator( task_id='data_refresh', python_callable=cl.data_refresh, provide_context=True, op_kwargs={"var": api_key_var})
no_updates = PythonOperator( task_id='no_updates', python_callable=cl.no_updates, provide_context=True, op_kwargs={"save": save})
Progettare pipeline di forecasting per la produzione

Impostare il DAG

check_api >> check_status >> [data_refresh, no_updates]
data_refresh >> [data_validation, data_failure] 
data_validation >> check_validation >> [data_valid, data_invalid]
data_valid >> forecast_refresh >> forecast_score
Progettare pipeline di forecasting per la produzione

Impostare il DAG

Interfaccia Airflow

Progettare pipeline di forecasting per la produzione

Ayo berlatih!

Progettare pipeline di forecasting per la produzione

Preparing Video For Download...