Airflow

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Rami Krispin

Senior Manager, Data Science and Engineering

Airflow boru hattı

Boru Hattı DAG'ı

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Destekleyici fonksiyonları oluşturma

Adımlar:
  • Destekleyici fonksiyonları oluşturun
    • Destek modülleri
    • Çağrılabilir (callable) fonksiyonlar
import etl.eia_etl as ee

import etl.forecast_utils as fu
import etl.dags_functions as df
def check_updates_api(path, var):
    check = df.CheckUpdates()
    check.check_updates(path = path, 
                        var = var)
    return check.status
Üretim için Tahmin (Forecasting) Hatları Tasarlama

Boru hattını ayarlama

Adımlar

  • Destekleyici fonksiyonları oluşturun
    • Destek modülleri
    • Çağrılabilir (callable) fonksiyonlar
  • DAG'ı ayarlayın
  • DAG'ı test edin
  • Canlıya alın

Airflow DAG

Airflow DAG Görevi

Üretim için Tahmin (Forecasting) Hatları Tasarlama

DAG'ı ayarlama

Airflow DAG

Üretim için Tahmin (Forecasting) Hatları Tasarlama

DAG'ı test etme

Airflow DAG Görevi

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Prototip oluşturma

En iyi uygulamalar

  • Prototip oluşturun

Boru Hattı Prototip Defteri

Üretim için Tahmin (Forecasting) Hatları Tasarlama

DAG yapısını ayarlayın

Airflow DAG'daki Fonksiyonlar

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Bileşenleri ekleyin

Airflow DAG

  • Boru hattı testini sağlar
Üretim için Tahmin (Forecasting) Hatları Tasarlama

Senaryo simülasyonu

Airflow DAG

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Açık iletiler

Airflow Günlüğü

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Airflow operatörleri

$$

$$ Airflow Python Operator

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Airflow PythonOperator

  • PythonOperator
  • BranchPythonOperator

Airflow Python Operator

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Airflow BranchPythonOperator

  • PythonOperator
  • BranchPythonOperator

Airflow Python Operator

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Airflow BranchPythonOperator

  • PythonOperator
  • BranchPythonOperator

Airflow Python Operator

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Modülleri içe aktarma

# Airflow modules
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator 
from airflow.providers.standard.operators.python import BranchPythonOperator

# Pipeline supporting modules import etl.eia_etl as ee import etl.callable as cl
# Other required libraries from datetime import datetime, timedelta import json import pointblank as pb
Üretim için Tahmin (Forecasting) Hatları Tasarlama

DAG varsayılan argümanlarını ayarlama

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 3, 15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
Üretim için Tahmin (Forecasting) Hatları Tasarlama

DAG'ı ayarlama

with DAG(
    'data_pipeline',
    default_args=default_args,
    description='Data pipeline for ETL process',
    schedule='@daily',
    tags = ["python", "etl", "forecast"]
) as dag:

check_api = PythonOperator(
task_id='check_api',
python_callable=cl.check_updates_api,
op_kwargs={"path": "./settings/settings.json", "var": "EIA_API_KEY"} )
Üretim için Tahmin (Forecasting) Hatları Tasarlama

DAG'ı ayarlama

check_status = BranchPythonOperator(
  task_id='check_status',
  python_callable=cl.update_status,
  provide_context=True)

data_refresh = BranchPythonOperator( task_id='data_refresh', python_callable=cl.data_refresh, provide_context=True, op_kwargs={"var": api_key_var})
no_updates = PythonOperator( task_id='no_updates', python_callable=cl.no_updates, provide_context=True, op_kwargs={"save": save})
Üretim için Tahmin (Forecasting) Hatları Tasarlama

DAG'ı ayarlama

check_api >> check_status >> [data_refresh, no_updates]
data_refresh >> [data_validation, data_failure] 
data_validation >> check_validation >> [data_valid, data_invalid]
data_valid >> forecast_refresh >> forecast_score
Üretim için Tahmin (Forecasting) Hatları Tasarlama

DAG'ı ayarlama

Airflow Arayüzü

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Hadi pratik yapalım!

Üretim için Tahmin (Forecasting) Hatları Tasarlama

Preparing Video For Download...