Skip to content

Workflows

The workflows package provides high-level classes that orchestrate multi-step processes like daily model calibration and historical backtesting.

BacktestWorkflow

BacktestWorkflow(ticker: str, model_config: dict)

Orchestrates a backtest for a single model over multiple historical snapshots.

This workflow iterates through available historical data, using each day's data to calibrate a model and the subsequent day's data to evaluate its out-of-sample performance.

Initializes the backtest workflow.

Parameters:

Name Type Description Default
ticker str

The stock ticker to run the backtest for.

required
model_config dict

A dictionary "recipe" defining how to calibrate the model.

required
Source code in src/optpricing/workflows/backtest_workflow.py
def __init__(self, ticker: str, model_config: dict):
    """
    Initializes the backtest workflow.

    Parameters
    ----------
    ticker : str
        The stock ticker to run the backtest for.
    model_config : dict
        A dictionary "recipe" defining how to calibrate the model.
    """
    self.ticker = ticker.upper()
    self.model_config = model_config
    self.results = []

run

run()

Executes the full backtesting loop.

It fetches available dates, then for each calibration/evaluation pair, it runs a DailyWorkflow to calibrate the model and then evaluates the out-of-sample RMSE on the next day's data.

Source code in src/optpricing/workflows/backtest_workflow.py
def run(self):
    """
    Executes the full backtesting loop.

    It fetches available dates, then for each calibration/evaluation pair,
    it runs a `DailyWorkflow` to calibrate the model and then evaluates
    the out-of-sample RMSE on the next day's data.
    """
    available_dates = get_available_snapshot_dates(self.ticker)
    if len(available_dates) < 2:
        logger.warning(
            "Backtest for %s requires at least 2 days of data. Skipping.",
            self.model_config["name"],
        )
        return

    for i in range(len(available_dates) - 1):
        calib_date, eval_date = available_dates[i], available_dates[i + 1]

        logger.info(
            "--- Processing Period: Calibrate on %s, Evaluate on %s ---",
            calib_date,
            eval_date,
        )

        calib_data = load_market_snapshot(self.ticker, calib_date)
        eval_data = load_market_snapshot(self.ticker, eval_date)
        if calib_data is None or eval_data is None:
            continue

        # Run a daily workflow to get the calibrated model
        calib_workflow = DailyWorkflow(
            market_data=calib_data, model_config=self.model_config
        )
        calib_workflow.run()

        if calib_workflow.results["Status"] != "Success":
            logger.warning(
                "Calibration failed. Skipping evaluation for this period."
            )
            continue

        calibrated_model = self.model_config["model_class"](
            params=calib_workflow.results["Calibrated Params"]
        )

        eval_workflow = DailyWorkflow(
            market_data=eval_data,
            model_config=self.model_config,
        )
        eval_workflow._prepare_for_evaluation()

        rmse = eval_workflow._evaluate_rmse(
            calibrated_model, eval_workflow.stock, eval_workflow.rate
        )

        logger.info(
            "Out-of-Sample RMSE for %s on %s: %.4f",
            self.model_config["name"],
            eval_date,
            rmse,
        )

        self.results.append(
            {
                "Eval Date": eval_date,
                "Model": self.model_config["name"],
                "Out-of-Sample RMSE": rmse,
            }
        )

save_results

save_results()

Saves the collected backtest results to a CSV file in the artifacts directory.

Source code in src/optpricing/workflows/backtest_workflow.py
def save_results(self):
    """
    Saves the collected backtest results to a CSV file in the artifacts directory.
    """
    if not self.results:
        logger.info("No backtest results to save.")
        return

    df = pd.DataFrame(self.results)
    today_str = pd.Timestamp.now().strftime("%Y-%m-%d")
    filepath = BACKTEST_LOGS_DIR / f"{self.ticker}_backtest_{today_str}.csv"
    df.to_csv(filepath, index=False)
    logger.info("Detailed backtest log saved to: %s", filepath)

DailyWorkflow

DailyWorkflow(
    market_data: DataFrame, model_config: dict[str, Any]
)

Orchestrates the calibration of a single model for a single snapshot of market data.

This class encapsulates the entire process for a given day: 1. Fits market-implied risk-free rate (r) and dividend yield (q). 2. Prepares initial parameter guesses, optionally using historical data. 3. Calibrates the model to front-month options. 4. Evaluates the calibrated model's performance (RMSE) on the full option chain.

Initializes the daily workflow.

Parameters:

Name Type Description Default
market_data DataFrame

A DataFrame containing the option chain for a single snapshot date.

required
model_config dict[str, Any]

A dictionary defining how to calibrate the model.

required
Source code in src/optpricing/workflows/daily_workflow.py
def __init__(
    self,
    market_data: pd.DataFrame,
    model_config: dict[str, Any],
):
    """
    Initializes the daily workflow.

    Parameters
    ----------
    market_data : pd.DataFrame
        A DataFrame containing the option chain for a single snapshot date.
    model_config : dict[str, Any]
        A dictionary defining how to calibrate the model.
    """
    self.market_data = market_data
    self.model_config = model_config
    self.results: dict[str, Any] = {"Model": self.model_config["name"]}
    if "ticker" in self.model_config:
        self.results["Ticker"] = self.model_config["ticker"]
    self.stock: Stock | None = None
    self.rate: Rate | None = None

run

run()

Executes the full calibration and evaluation workflow.

This method performs all steps in sequence and populates the self.results dictionary with the outcome, including status, calibrated parameters, and final RMSE. It includes error handling to ensure the workflow doesn't crash on failure.

Source code in src/optpricing/workflows/daily_workflow.py
def run(self):
    """
    Executes the full calibration and evaluation workflow.

    This method performs all steps in sequence and populates the `self.results`
    dictionary with the outcome, including status, calibrated parameters,
    and final RMSE. It includes error handling to ensure the workflow
    doesn't crash on failure.
    """
    model_name = self.model_config["name"]
    ticker = self.results.get("Ticker", "N/A")
    logger.info("=" * 60)
    logger.info(
        "### Starting Workflow for Model: %s on Ticker: %s", model_name, ticker
    )
    logger.info("=" * 60)

    try:
        spot = self.market_data["spot_price"].iloc[0]

        logger.info("[Step 1] Getting live dividend and fitting implied rate...")
        q = get_live_dividend_yield(ticker)
        calls = self.market_data[self.market_data["optionType"] == "call"]
        puts = self.market_data[self.market_data["optionType"] == "put"]
        r, _ = fit_rate_and_dividend(calls, puts, spot, q_fixed=q)

        self.results.update({"Implied Rate": r, "Known Dividend": q})
        self.stock = Stock(spot=spot, dividend=q)
        self.rate = Rate(rate=r)

        logger.info("  -> Known q: %.4f, Implied r: %.4f", q, r)

        logger.info("[Step 2] Filtering market data to liquid options...")
        original_count = len(self.market_data)
        min_moneyness, max_moneyness = 0.85, 1.15

        calibration_data = (
            self.market_data[
                (self.market_data["strike"] / spot >= min_moneyness)
                & (self.market_data["strike"] / spot <= max_moneyness)
            ]
            .copy()
            .reset_index(drop=True)
        )
        _data_msg = f"{len(calibration_data)} of {original_count} options"
        logger.info(f"  -> Using {_data_msg} for calibration.")

        logger.info("[Step 3] Preparing dynamic initial guesses...")
        model_class = self.model_config["model_class"]
        if not hasattr(model_class, "default_params"):
            logger.error(f"Model {model_class.name} is missing default_params..")
        model_instance = model_class(params=model_class.default_params)

        if hasattr(model_instance, "param_defs"):
            bounds = {
                k: (p["min"], p["max"])
                for k, p in model_instance.param_defs.items()
            }
            initial_guess = {
                k: p["default"] for k, p in model_instance.param_defs.items()
            }
        else:
            # Fallback if param_defs is not defined
            bounds = self.model_config.get("bounds", {})
            initial_guess = model_instance.params.copy()
        # For any model with 'sigma', use average IV as a smart guess
        if (
            "sigma" in initial_guess
            and "impliedVolatility" in calibration_data.columns
        ):
            avg_iv = calibration_data["impliedVolatility"].mean()
            if pd.notna(avg_iv) and avg_iv > 0.01:
                initial_guess["sigma"] = avg_iv
                logger.info(f"  -> Dynamic initial guess for sigma: {avg_iv:.4f}")

        frozen_params_dict = {}
        if model_name == "Merton" and self.model_config.get(
            "use_historical_strategy"
        ):
            logger.info(
                "  -> Activating Merton strat.: freezing historical jump params..."
            )
            hist_returns = load_historical_returns(ticker)
            jump_params = fit_jump_params_from_history(hist_returns)
            frozen_params_dict.update(jump_params)
            initial_guess.update(jump_params)
            logger.info(f"  -> Historical estimates frozen: {jump_params}")

        # Handle any other frozen parameters defined in the config
        frozen_from_config = self.model_config.get("frozen", {})
        if frozen_from_config:
            frozen_params_dict.update(frozen_from_config)

        # Calibrate the model
        logger.info("[Step 4] Calibrating %s...", model_name)
        calibrator = Calibrator(
            model_instance, calibration_data, self.stock, self.rate
        )
        calibrated_params = calibrator.fit(
            initial_guess=initial_guess,
            bounds=bounds,
            frozen_params=frozen_params_dict,
        )
        self.results["Calibrated Params"] = calibrated_params

        # Evaluate the calibrated model on the full chain
        logger.info(
            "[Step 5] Evaluating calibrated %s on the full chain...", model_name
        )
        final_model = model_instance.with_params(**calibrated_params)
        rmse = self._evaluate_rmse(final_model, self.stock, self.rate)
        self.results["RMSE"] = rmse
        self.results["Status"] = "Success"
        logger.info("  -> Evaluation Complete. Final RMSE: %.4f", rmse)

    except Exception as e:
        logger.error(
            "!!!!!! WORKFLOW FAILED for %s !!!!!!", model_name, exc_info=True
        )
        self.results.update({"RMSE": np.nan, "Status": "Failed", "Error": str(e)})