ETL and ELT in Python
Jake Roach
Data Engineer
Data pipelines should be monitored for changes to data and failures in execution
$$
import logging
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG)
# Create different types of logs
logging.debug(f"Variable has value {path}")
logging.info("Data has been transformed and will now be loaded.")
DEBUG: Variable has value raw_file.csv
INFO: Data has been transformed and will now be loaded.
import logging
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG)
# Create different types of logs
logging.warning("Unexpected number of rows detected.")
logging.error("{ke} arose in execution.")
WARNING: Unexpected number of rows detected.
ERROR: KeyError arose in execution.
try:
# Execute some code here
...
except:
# Logging about failures that occured
# Logic to execute upon exception
...
Pass the specific exception in the except
clause
try:
# Try to filter by price_change
clean_stock_data = transform(raw_stock_data)
logging.info("Successfully filtered DataFrame by 'price_change'")
except KeyError as ke:
# Handle the error, create new column, transform
logging.warning(f"{ke}: Cannot filter DataFrame by 'price_change'")
raw_stock_data["price_change"] = raw_stock_data["close"] - raw_stock_data["open"]
clean_stock_data = transform(raw_stock_data)
ETL and ELT in Python