Hepsini bir araya getirme

Data Engineering'e Giriş

Vincent Vankrunkelsven

Data Engineer @ DataCamp

ETL fonksiyonu

def extract_table_to_df(tablename, db_engine):
  return pd.read_sql("SELECT * FROM {}".format(tablename), db_engine)

def split_columns_transform(df, column, pat, suffixes): # Converts column into str and splits it on pat...
def load_df_into_dwh(film_df, tablename, schema, db_engine): return pd.to_sql(tablename, db_engine, schema=schema, if_exists="replace")
db_engines = { ... } # Needs to be configured def etl(): # Extract film_df = extract_table_to_df("film", db_engines["store"]) # Transform film_df = split_columns_transform(film_df, "rental_rate", ".", ["_dollar", "_cents"]) # Load load_df_into_dwh(film_df, "film", "store", db_engines["dwh"])
Data Engineering'e Giriş

Airflow tazeleme

Airflow logosu

 

  • İş akışı zamanlayıcı
  • Python
  • DAG'ler

Bir DAG'in kavramsal örneği

  • Görevler operatörlerle tanımlanır (örn. BashOperator)
Data Engineering'e Giriş

Airflow'da DAG ile zamanlama

from airflow.models import DAG

dag = DAG(dag_id="sample",
          ...,
          schedule_interval="0 0 * * *")

# cron
# .------------------------- dakika          (0 - 59)
# | .----------------------- saat            (0 - 23)
# | | .--------------------- ayın günü       (1 - 31)
# | | | .------------------- ay              (1 - 12)
# | | | | .----------------- haftanın günü   (0 - 6)
# * * * * * <komut>

# Örnek
0 * * * * # Her saat, 0. dakikada

bkz. https://crontab.guru

Data Engineering'e Giriş

DAG tanım dosyası

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(dag_id="etl_pipeline",
          schedule_interval="0 0 * * *")

etl_task = PythonOperator(task_id="etl_task", python_callable=etl, dag=dag)
etl_task.set_upstream(wait_for_this_task)
Data Engineering'e Giriş

DAG tanım dosyası

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

...

etl_task.set_upstream(wait_for_this_task)

~/airflow/dags/ içinde etl_dag.py olarak kaydedildi

Data Engineering'e Giriş

Airflow Arayüzü

Airflow arayüzü ekran görüntüsü

Data Engineering'e Giriş

Hadi pratik yapalım!

Data Engineering'e Giriş

Preparing Video For Download...