Airflow

Merancang Pipeline Peramalan untuk Produksi

Rami Krispin

Senior Manager, Data Science and Engineering

Pipeline Airflow

DAG Pipeline

Merancang Pipeline Peramalan untuk Produksi

Bangun fungsi pendukung

Langkah:
  • Bangun fungsi pendukung
    • Modul pendukung
    • Fungsi callable
import etl.eia_etl as ee

import etl.forecast_utils as fu
import etl.dags_functions as df
def check_updates_api(path, var):
    check = df.CheckUpdates()
    check.check_updates(path = path, 
                        var = var)
    return check.status
Merancang Pipeline Peramalan untuk Produksi

Menyiapkan pipeline

Langkah

  • Bangun fungsi pendukung
    • Modul pendukung
    • Fungsi callable
  • Atur DAG
  • Uji DAG
  • Deploy ke produksi

DAG Airflow

Tugas DAG Airflow

Merancang Pipeline Peramalan untuk Produksi

Atur DAG

DAG Airflow

Merancang Pipeline Peramalan untuk Produksi

Menguji DAG

Tugas DAG Airflow

Merancang Pipeline Peramalan untuk Produksi

Membangun prototipe

Praktik terbaik

  • Buat prototipe

Notebook Prototipe Pipeline

Merancang Pipeline Peramalan untuk Produksi

Atur struktur DAG

Fungsi dalam DAG Airflow

Merancang Pipeline Peramalan untuk Produksi

Tambahkan komponen

DAG Airflow

  • Memungkinkan pengujian pipeline
Merancang Pipeline Peramalan untuk Produksi

Simulasikan skenario

DAG Airflow

Merancang Pipeline Peramalan untuk Produksi

Pesan yang jelas

Log Airflow

Merancang Pipeline Peramalan untuk Produksi

Operator Airflow

$$

$$ Operator Python Airflow

Merancang Pipeline Peramalan untuk Produksi

Airflow PythonOperator

  • PythonOperator
  • BranchPythonOperator

Operator Python Airflow

Merancang Pipeline Peramalan untuk Produksi

Airflow BranchPythonOperator

  • PythonOperator
  • BranchPythonOperator

Operator Python Airflow

Merancang Pipeline Peramalan untuk Produksi

Airflow BranchPythonOperator

  • PythonOperator
  • BranchPythonOperator

Operator Python Airflow

Merancang Pipeline Peramalan untuk Produksi

Mengimpor modul

# Airflow modules
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator 
from airflow.providers.standard.operators.python import BranchPythonOperator

# Pipeline supporting modules import etl.eia_etl as ee import etl.callable as cl
# Other required libraries from datetime import datetime, timedelta import json import pointblank as pb
Merancang Pipeline Peramalan untuk Produksi

Menetapkan argumen default DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 3, 15),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
Merancang Pipeline Peramalan untuk Produksi

Menetapkan DAG

with DAG(
    'data_pipeline',
    default_args=default_args,
    description='Data pipeline for ETL process',
    schedule='@daily',
    tags = ["python", "etl", "forecast"]
) as dag:

check_api = PythonOperator(
task_id='check_api',
python_callable=cl.check_updates_api,
op_kwargs={"path": "./settings/settings.json", "var": "EIA_API_KEY"} )
Merancang Pipeline Peramalan untuk Produksi

Menetapkan DAG

check_status = BranchPythonOperator(
  task_id='check_status',
  python_callable=cl.update_status,
  provide_context=True)

data_refresh = BranchPythonOperator( task_id='data_refresh', python_callable=cl.data_refresh, provide_context=True, op_kwargs={"var": api_key_var})
no_updates = PythonOperator( task_id='no_updates', python_callable=cl.no_updates, provide_context=True, op_kwargs={"save": save})
Merancang Pipeline Peramalan untuk Produksi

Menetapkan DAG

check_api >> check_status >> [data_refresh, no_updates]
data_refresh >> [data_validation, data_failure] 
data_validation >> check_validation >> [data_valid, data_invalid]
data_valid >> forecast_refresh >> forecast_score
Merancang Pipeline Peramalan untuk Produksi

Menetapkan DAG

UI Airflow

Merancang Pipeline Peramalan untuk Produksi

Ayo berlatih!

Merancang Pipeline Peramalan untuk Produksi

Preparing Video For Download...