Logs

Designing Forecasting Pipelines for Production

Rami Krispin

Senior Manager, Data Science and Engineering

Pipeline logs

Logging

  • Metadata
  • Validation tests
  • Status
  • Performance
Designing Forecasting Pipelines for Production

Pipeline logs

Logging

  • Metadata
  • Validation tests
  • Status
  • Performance

Functionality

  • Observability
  • Debugging
  • Role-based decisions
Designing Forecasting Pipelines for Production

Setting logs

  • Set the log schema
  • Handle scenarios:
    • No new data is available
    • Validation outcome
  • Append the log
Designing Forecasting Pipelines for Production

The ETL log process

Check API Task

import etl.eia_etl as ee

log = ee.Log()
facet = {'respondent': 'US48', 'type': 'D'}
log.create_log(facets = facets)

Log Schema

Designing Forecasting Pipelines for Production

The ETL log process

Check API Task

import etl.eia_etl as ee

log = ee.Log()
facet = {'respondent': 'US48', 'type': 'D'}
log.create_log(facets = facets)

Log Schema

Designing Forecasting Pipelines for Production

The ETL log process

Check API Task

import etl.eia_etl as ee

log = ee.Log()
facet = {'respondent': 'US48', 'type': 'D'}
log.create_log(facets = facets)

Log Schema

Designing Forecasting Pipelines for Production

The ETL log process

Check API Task

import etl.eia_etl as ee

log = ee.Log()
facet = {'respondent': 'US48', 'type': 'D'}
log.create_log(facets = facets)

Log Schema

Designing Forecasting Pipelines for Production

The ETL log process

Check API Task

import etl.eia_etl as ee

log = ee.Log()
facet = {'respondent': 'US48', 'type': 'D'}
log.create_log(facets = facets)

Log Schema

Designing Forecasting Pipelines for Production

The ETL log process

Check API Task

import etl.eia_etl as ee

log = ee.Log()
facet = {'respondent': 'US48', 'type': 'D'}
log.create_log(facets = facets)

Log Schema

Designing Forecasting Pipelines for Production

The ETL log process

Check API Task

import etl.eia_etl as ee

log = ee.Log()
facet = {'respondent': 'US48', 'type': 'D'}
log.create_log(facets = facets)

Log Schema

Designing Forecasting Pipelines for Production

The ETL log process

Check API Task

meta = ee.get_metadata(api_key = api_key,
api_path = api_meta_path, 
meta_path= log_path, 
facets = facets, 
offset = 22, window = 336)

API Status

Designing Forecasting Pipelines for Production

The ETL log process

Data Status Task

meta = ee.get_metadata(api_key = api_key,
api_path = api_meta_path, 
meta_path= log_path, 
facets = facets, 
offset = 22, window = 336)

API Status

Designing Forecasting Pipelines for Production

The ETL log process

Validation Task

if not meta.updates_available:
  log.no_updates()

API Status

Designing Forecasting Pipelines for Production

The ETL log process

Validation Task

API Status

Designing Forecasting Pipelines for Production

The ETL log process

Data Refresh Task

get = ea.eia_get(api_key=api_key, 
    api_path= api_data_path, 
    data = "value", facets = facets, 
    start = meta.start, 
    end = meta.api_end_offset)

Data Output

Designing Forecasting Pipelines for Production

The ETL log process

Data Refresh Failure Task

log.failure()
Designing Forecasting Pipelines for Production

The ETL log process

Data Validation Task

test = ee.Validation(data = get.data,  
tbl_name= "get request", 
label = "validation",
parameters=get.parameters,initial= False,
warning=0.10, error=0, critical=0)
schema_refresh = pb.Schema(
    columns=[
        ("index", "datetime64[ns]"),   
        ("respondent", "object"),
        ("respondent-name", "object"),
        ("type", "object"),
        ("type-name", "object"),
        ("value", "int64"),
        ("value-units", "object")
    ])

test.add_schema(schema = schema_refresh)
Designing Forecasting Pipelines for Production

The ETL log process

Data Validation Task

Data Validation Summary

Designing Forecasting Pipelines for Production

The ETL log process

Validation Failure Task

log.failure()
Designing Forecasting Pipelines for Production

The ETL log process

Append the Data Task

if log.log["status"]:
  print("Appending the data")
  df = ee.AppendData()
  df.append_data(data_path = data_path, 
    new_data = get.data,save = True,
    schema = schema_append, 
    parameters = get.parameters)
  print(df.validation)
  if df.status and df.save:
    log.log["update"] = True
  else:
    log.log["update"] = False
Designing Forecasting Pipelines for Production

The ETL log process

ETL Log Table

Designing Forecasting Pipelines for Production

The ETL log process

ETL Log Table

Designing Forecasting Pipelines for Production

The ETL log process

ETL Log Table

Designing Forecasting Pipelines for Production

The ETL log process

ETL Log Table

Designing Forecasting Pipelines for Production

The forecast log process

Refresh the Forecast Task

Designing Forecasting Pipelines for Production

The forecast log process

Score the Forecast Task

Designing Forecasting Pipelines for Production

The forecast log process

Forecast Log Table

Designing Forecasting Pipelines for Production

The forecast log process

Forecast Log Table

Designing Forecasting Pipelines for Production

The forecast log process

Forecast Log Table

Designing Forecasting Pipelines for Production

The forecast log process

Forecast Log Table

Designing Forecasting Pipelines for Production

Time for practice!

Designing Forecasting Pipelines for Production

Preparing Video For Download...