Designing Forecasting Pipelines for Production
Rami Krispin
Senior Manager, Data Science and Engineering

print(bkt_df.head())
     unique_id    ds                     cutoff                 y            lightGBM         
0    1            2024-04-22 00:00:00    2024-04-21 23:00:00    421082.60    421089.155837    
1    1            2024-04-22 01:00:00    2024-04-21 23:00:00    429728.30    425700.453391    
2    1            2024-04-22 02:00:00    2024-04-21 23:00:00    430690.96    424382.613668    
3    1            2024-04-22 03:00:00    2024-04-21 23:00:00    420094.58    409967.877157    
4    1            2024-04-22 04:00:00    2024-04-21 23:00:00    403292.36    393175.446116    
print(bkt_df[["ds", "lightGBM", "lightGBM-lo-95", "lightGBM-hi-95"]].head())
     ds                     lightGBM         lightGBM-lo-95   lightGBM-hi-95
0    2024-04-14 00:00:00    422716.385199    421022.905138    424409.865260
1    2024-04-14 01:00:00    422439.422659    417651.607031    427227.238288
2    2024-04-14 02:00:00    417209.926483    407277.565240    427142.287725
3    2024-04-14 03:00:00    405820.047603    392670.364652    418969.730553
4    2024-04-14 04:00:00    386520.594124    372017.701036    401023.487212
cutoff = bkt_df["cutoff"].unique()
partitions_mapping = pd.DataFrame({"cutoff": cutoff, 
    "partition": range(1, len(cutoff) + 1)})
print(partitions_mapping)
     cutoff                 partition
0    2024-04-13 23:00:00    1
1    2024-04-14 23:00:00    2
2    2024-04-15 23:00:00    3
3    2024-04-16 23:00:00    4
4    2024-04-17 23:00:00    5
model_label = ["lightGBM", "xgboost", "linear_regression", "lasso", "ridge"]
model_name = ['LGBMRegressor', 'XGBRegressor', 'LinearRegression', 'Lasso', 'Ridge']
models_mapping = pd.DataFrame({"model_label": model_label, "model_name": model_name})
print(models_mapping)
        model_label        model_name
0       lightGBM           LGBMRegressor
1       xgboost            XGBRegressor
2       linear_regression  LinearRegression
3       lasso              Lasso
4       ridge              Ridge
bkt_long = pd.melt(
    bkt_df,
    id_vars=["unique_id", "ds", "cutoff", "y"],
    value_vars=model_label + [f"{model}-lo-95" for model in model_label] \
                      + [f"{model}-hi-95" for model in model_label],
    var_name="model_label",
    value_name="value")
print(bkt_long.head())
  unique_id      ds                 cutoff          y    model_label    value
0    1  2024-11-19 00:00:00  2024-11-18 23:00:00  477465  lightGBM  478914.014832
1    1  2024-11-19 01:00:00  2024-11-18 23:00:00  475805  lightGBM  482088.981788
2    1  2024-11-19 02:00:00  2024-11-18 23:00:00  469719  lightGBM  477138.303561
3    1  2024-11-19 03:00:00  2024-11-18 23:00:00  458311  lightGBM  466026.700362
4    1  2024-11-19 04:00:00  2024-11-18 23:00:00  441835  lightGBM  446428.477909
def split_model_confidence(model_name): if "-lo-95" in model_name: return model_name.replace("-lo-95", ""), "lower" elif "-hi-95" in model_name: return model_name.replace("-hi-95", ""), "upper" else: return model_name, "forecast"bkt_long["model_label"],\ bkt_long["type"] = zip(*bkt_long["model_label"].map(split_model_confidence))
bkt_long = bkt_long.merge(partitions_mapping, how = "left", on = ["cutoff"])bkt = (bkt_long .pivot(index = ["unique_id", "ds", "model_label", "partition", "y"], columns = "type", values = "value") .reset_index() .merge(models_mapping, how = "left", on = ["model_label"]))

def mape(y, yhat):
    mape = mean(abs(y - yhat)/ y) 
    return mape
def rmse(y, yhat):
    rmse = (mean((y - yhat) ** 2 )) ** 0.5
    return rmse
def coverage(y, lower, upper):
    coverage = sum((y <= upper) & (y >= lower)) / len(y)
    return coverage
Helper function:
def score(df):
    mape_score = mape(y = df["y"], yhat = df["forecast"])
    rmse_score = rmse(y = df["y"], yhat = df["forecast"])
    coverage_score = coverage(y = df["y"], lower = df["lower"], upper = df["upper"])
    cols = ["mape", "rmse", "coverage"]
    d = pd.Series([mape_score, rmse_score,  coverage_score], index=cols)
    return d
Score the models:
score_df = (bkt
.groupby(["unique_id", "model_label", "model_name", "partition"])[["unique_id", 
"model_label", "model_name", "partition", "y", "forecast", "lower", "upper"]]
.apply(score)
.reset_index())
print(score_df.head())
     unique_id   model     partition    mape        rmse            coverage
0    1           lasso     1            0.050913    29315.983715    0.486111
1    1           lasso     2            0.037723    19034.393950    0.763889
2    1           lasso     3            0.017668    9768.991810     0.986111
3    1           lasso     4            0.014224    7839.292592     1.000000
4    1           lasso     5            0.023679    13550.628885    0.847222


import mlflow import datetime experiment_name = "ml_forecast" mlflow_path = "file:///mlruns"tags = {"h": h, "step_size": step_size, "partitions": partitions, "intervals_type": "ConformalIntervals", "intervals_h": h, "intervals_n_windows": n_windows, "intervals_method": "conformal_distribution", "levels": levels }
try: mlflow.create_experiment(name = experiment_name, artifact_location= mlflow_path, tags = tags) meta = mlflow.get_experiment_by_name(experiment_name) print(f"Set a new experiment {experiment_name}") print("Pulling the metadata")except: print(f"Experiment {experiment_name} exists, pulling the metadata") meta = mlflow.get_experiment_by_name(experiment_name)
Set a new experiment ml_forecast
Pulling the metadata
run_time = datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
for index, row in score_df.iterrows():run_name = row["model_label"] + "-" + run_timewith mlflow.start_run(experiment_id = meta.experiment_id, run_name = run_name, tags = {"type": "backtesting","partition": row["partition"], "unique_id": row["unique_id"],"model_label": row["model_label"], "model_name": row["model_name"],"run_name": run_name}) as run:model_params = ml_models[row["model_label"]].get_params() model_params["model_name"] = row["model_name"] model_params["model_label"] = row["model_label"] model_params["partition"] = row["partition"] model_params["lags"] = list(range(1, 24)) model_params["date_features"] = ["month", "day", "dayofweek", "week", "hour"] mlflow.log_params(model_params)mlflow.log_metric("mape", row["mape"]) mlflow.log_metric("rmse", row["rmse"]) mlflow.log_metric("coverage", row["coverage"])
Designing Forecasting Pipelines for Production