Source code for forecasting.simulation.worker

import os
import time
import gc
import traceback
import re
from datetime import datetime
from typing import List, Optional, Dict

from common.loggers.timing import get_logger as _get_timing_logger, log_time

import pandas as pd
from forecasting.simulation.config import BacktestConfig
from forecasting.data.storage import Storage


[docs] def run_single_day( path: str, config: BacktestConfig, output_dir: Optional[str], i: int = 0, total_days: int = 1, benchmark_file: Optional[str] = None, ) -> List: """ Load, simulate, and save results for a single day/file. Designed to run as a standalone picklable unit inside a worker process or Modal container. Paths must already be remote paths (see Storage.get_remote_path). Returns a metrics row; records timing events if timing is enabled. """ from forecasting.data.loader import DataLoader from forecasting.simulation.simulate_day import SingleDaySimulation filename = os.path.basename(path) _tlogger = _get_timing_logger() tlog = _tlogger.child(filename=filename) if _tlogger else None metrics = { "timestamp": datetime.now().isoformat(), "filename": filename, "instrument_id": -1, "n_events": 0, "load_time": 0.0, "engine_time": 0.0, "save_time": 0.0, "total_time": 0.0, "status": "SUCCESS", "error": "", } # Context container for cleanup ctx = {"loader": None, "data": None, "sim": None} try: # 1. Load Data t0 = time.time() ctx["loader"] = DataLoader( path, start_time=config.start_time, end_time=config.end_time, timezone=config.timezone, ) ctx["data"] = ctx["loader"].load() metrics["load_time"] = time.time() - t0 metrics["instrument_id"] = ctx["data"].instrument_id metrics["n_events"] = ctx["data"].n_events log_time( "worker_load", metrics["load_time"], logger=tlog, n_events=metrics["n_events"], ) print( f"[{i + 1}/{total_days}] Loaded {filename}: {ctx['data'].n_events} events." ) # 2. Run Engine t0 = time.time() ctx["sim"] = SingleDaySimulation(ctx["data"]) results = ctx["sim"].run(config.features, config.target_s) metrics["engine_time"] = time.time() - t0 log_time("worker_engine", metrics["engine_time"], logger=tlog) # 3. Save if output_dir: t0 = time.time() out_path = _get_output_path(output_dir, filename) Storage.save_parquet(pd.DataFrame(results), out_path) metrics["save_time"] = time.time() - t0 log_time("worker_save", metrics["save_time"], logger=tlog) print(f"[{i + 1}/{total_days}] Saved to {out_path}") except Exception as e: metrics["status"] = "FAILED" metrics["error"] = f"{type(e).__name__}: {str(e)}" print(f"FAILED {filename}: {metrics['error']}") # traceback.print_exc() # Optional: keep verbose off? finally: # Explicit Cleanup ctx.clear() results = None # Clear results ref gc.collect() metrics["total_time"] = ( metrics["load_time"] + metrics["engine_time"] + metrics["save_time"] ) log_time( "worker_task", metrics["total_time"], logger=tlog, n_events=metrics["n_events"], load_s=round(metrics["load_time"], 4), engine_s=round(metrics["engine_time"], 4), save_s=round(metrics["save_time"], 4), status=metrics["status"], ) row = _finalize_metrics(metrics) # Write to benchmark file (mostly for Sequential LOCAL runs) if benchmark_file: _append_benchmark(benchmark_file, row) return row
def _get_output_path(base_dir: str, filename: str) -> str: """Helper to determine output path from input filename.""" # Logic: 20240101.parquet date_match = re.search(r"(\d{8})", filename) out_base = date_match.group(1) if date_match else filename out_name = f"{out_base}.parquet" if base_dir.startswith("r2://"): return f"{base_dir.rstrip('/')}/{out_name}" return os.path.join(base_dir, out_name) def _finalize_metrics(m: Dict) -> List: """Converts metrics dict to standardized list row.""" # Ensure order matches schema return [ m["timestamp"], m["filename"], m["instrument_id"], m["n_events"], f"{m['load_time']:.4f}", f"{m['engine_time']:.4f}", f"{m['save_time']:.4f}", f"{m['total_time']:.4f}", m["status"], m["error"], ] def _append_benchmark(file_path: str, row: List): try: import csv with open(file_path, "a", newline="") as f: csv.writer(f).writerow(row) except Exception as e: print(f"Warning: Failed to append benchmark to {file_path}: {e}") pass