forecasting.simulation¶
forecasting.simulation.backtest_harness¶
- class forecasting.simulation.backtest_harness.BacktestHarness(config: BacktestConfig, storage_options: Dict | None = None)[source]¶
Bases:
BaseHarnessOrchestrates the execution of C++ backtest simulations using a clean, configuration-driven approach. Now delegates core logic to standalone worker functions.
- classmethod from_yaml(path: str, **overrides) BacktestHarness[source]¶
- get_market_subsampled_df(n: int = 100000, strategy: str = 'random_files', seed: int | None = None, n_days: int | None = None, instrument_id: int | None = None) polars.DataFrame[source]¶
Returns a DataFrame of raw market data (Databento MBO events) for exploration.
This is useful for analyzing raw order book events before featurization. Uses the DataLoader to read .dbn.zst files.
- Parameters:
n – Number of events to return (approximate for some strategies).
strategy –
“head”: First n events from first file.
”random_files”: Picks random files and reads n events total.
”random_days”: Picks n_days random files, reads all events.
seed – Random seed for file selection.
n_days – Number of files/days for “random_days” strategy.
instrument_id – Specific instrument to filter for (auto-detects if None).
- Returns:
action, side, price, size, order_id, ts_recv, flags
- Return type:
pl.DataFrame with columns
- run_distributed(executor, task_fn: Callable = None, benchmark_file: str | None = None, concurrency_limit: int = 100, secrets: list | None = None) polars.DataFrame[source]¶
Run the backtest across all configured data files using the given executor.
- Parameters:
concurrency_limit – Max concurrent workers (Modal containers).
secrets – List of Modal secrets (e.g. for AWS). If None, uses executor default.
- run_sequential(output_dir: str | None = None, benchmark_file: str | None = None) None[source]¶
Runs the backtest sequentially locally.
- to_analysis(output_base: str | None = None) AnalysisHarness[source]¶
Creates an AnalysisHarness pointing to the output of this backtest.
forecasting.simulation.analysis_harness¶
- class forecasting.simulation.analysis_harness.AnalysisHarness(id: str, base_source: str | List[str], storage_options: Dict | None = None)[source]¶
Bases:
BaseHarnessOrchestrates the post-hoc analysis of backtest results using a declarative pipeline pattern.
The AnalysisHarness treats research as a series of “Experiments.” Instead of generating new data files for every hypothesis, you define a ‘transform_fn’ (logic) that is registered in a compute graph.
This allows for:
Rapid Prototyping: Apply logic to a small local sample (subsampling) to verify math.
Scalable Execution: Execute that same logic across multi-terabyte datasets via distributed executors without code changes.
Lineage Tracking: Maintains a clear map of transformations applied to ‘baseline’ results.
- pipelines¶
A registry of named experiments represented as unexecuted Polars compute graphs.
- Type:
Dict[str, pl.LazyFrame]
- transforms¶
A registry of the actual functions used to generate the pipelines (used for re-applying logic during subsampling).
- Type:
Dict[str, Callable]
- add_experiment(name: str, transform_fn: Callable[[polars.LazyFrame], polars.LazyFrame], base: str = 'baseline')[source]¶
Registers a new hypothesis or data view, optionally chaining from another experiment.
This builds a ‘Virtual Dataset’. No computation happens until metrics are requested or the experiment is ‘sinked’ to disk.
- Parameters:
name – Unique name for this experiment (e.g., ‘filtered_high_vol’).
transform_fn – A function taking a LazyFrame and returning a modified LazyFrame.
base – Name of the experiment to build on (default: “baseline”).
Example
harness.add_experiment(“filtered”, lambda lf: lf.filter(pl.col(“volume”) > 1000)) harness.add_experiment(“aggregated”, lambda lf: lf.group_by(“symbol”).agg(…), base=”filtered”)
- property columns: List[str]¶
Returns all columns available in the baseline dataset.
- compute_global_metric(executor, map_fn: Callable, stats_class, output_base: str, pattern: str = '*.parquet', start_time: str | None = None, end_time: str | None = None, concurrency_limit: int = 100, secrets: list | None = None, return_per_day: bool = False) polars.DataFrame | Dict[str, polars.DataFrame][source]¶
Orchestrates a robust Map-Reduce metric calculation.
Standard ‘averaging’ of daily metrics is often mathematically wrong. This method uses ‘Sufficient Statistics’ (sums, counts, squared sums) to compute a globally accurate metric across the entire dataset.
- Parameters:
executor – BaseExecutor.
map_fn – The mapping function (e.g. map_r2_chunk).
stats_class – The SufficientStats class (e.g. R2SufficientStats).
output_base – Base path where backtest results are stored.
pattern – Glob pattern for result files.
start_time – Optional filter (e.g. “09:30:00”). If None, tries metadata.
end_time – Optional filter (e.g. “16:00:00”). If None, tries metadata.
concurrency_limit – Max concurrent worker containers (Modal).
return_per_day – If True, also return per-day metrics alongside aggregated.
- Returns:
pl.DataFrame with aggregated metrics. If return_per_day=True: Dict with keys “aggregated” and “per_day”.
- Return type:
If return_per_day=False
- get_experiment(name: str) polars.LazyFrame[source]¶
Returns the compute graph (LazyFrame) for a specific experiment.
The plan is built via the executor and serialized back using Polars native LazyFrame.serialize(). Call .collect() to execute the plan.
- Parameters:
name – Experiment name.
- Returns:
LazyFrame with the experiment’s compute graph.
- get_metrics(metrics: Dict[str, Callable[[polars.LazyFrame], polars.DataFrame]], experiment_names: List[str] | None = None, concurrency: int = 100) polars.DataFrame[source]¶
Triggers the execution of the compute graphs across selected experiments.
- Parameters:
metrics – Dict of metric functions to apply to each experiment pipeline.
experiment_names – Names of registered experiments to evaluate. Defaults to all.
concurrency – Max workers if distributed.
- get_subsampled_df(experiment_name: str = 'baseline', n: int = 100000, strategy: str = 'random_days', seed: int | None = None, step: int | None = None, n_days: int = 5) polars.DataFrame[source]¶
Injects a small data sample into an experiment’s logic to return an eager DataFrame.
This is the primary tool for interactive research. It guarantees that the logic you see in your notebook is identical to the logic run on the cluster.
- Parameters:
experiment_name – Which experiment logic to apply to the sample.
n – Row limit for the final sample.
strategy –
‘random_days’: Picks whole days (preserving HFT microstructure). Preferred.
’head’: Fast, but biased toward early dates.
’gather_every’: Systematically samples every Nth row.
n_days – How many full days to include in the ‘random_days’ sample.
- sink_experiment(name: str, output_path: str)[source]¶
Materializes the experiment compute graph into a Parquet file.
Collects data via executor, then writes to output_path using Storage (supports local, S3, or r2://dev/ paths).
Note: This loads all data into memory. For very large datasets, consider using streaming approaches or chunked writes.
forecasting.simulation.harness_base¶
- class forecasting.simulation.harness_base.BaseHarness(base_source: str | List[str], storage_options: Dict | None = None, id: str = 'harness')[source]¶
Bases:
objectAbstract base class for all Harnesses.
Provides common functionality for: - S3/Local/Modal volume file path resolution. - Cloud storage configuration.
forecasting.simulation.worker¶
- forecasting.simulation.worker.run_single_day(path: str, config: BacktestConfig, output_dir: str | None, i: int = 0, total_days: int = 1, benchmark_file: str | None = None) List[source]¶
Load, simulate, and save results for a single day/file.
Designed to run as a standalone picklable unit inside a worker process or Modal container. Paths must already be remote paths (see Storage.get_remote_path). Returns a metrics row; records timing events if timing is enabled.