import os
import time
import gc
import traceback
import re
from datetime import datetime
from typing import List, Optional, Dict
from common.loggers.timing import get_logger as _get_timing_logger, log_time
import pandas as pd
from forecasting.simulation.config import BacktestConfig
from forecasting.data.storage import Storage
[docs]
def run_single_day(
path: str,
config: BacktestConfig,
output_dir: Optional[str],
i: int = 0,
total_days: int = 1,
benchmark_file: Optional[str] = None,
) -> List:
"""
Load, simulate, and save results for a single day/file.
Designed to run as a standalone picklable unit inside a worker process or
Modal container. Paths must already be remote paths (see Storage.get_remote_path).
Returns a metrics row; records timing events if timing is enabled.
"""
from forecasting.data.loader import DataLoader
from forecasting.simulation.simulate_day import SingleDaySimulation
filename = os.path.basename(path)
_tlogger = _get_timing_logger()
tlog = _tlogger.child(filename=filename) if _tlogger else None
metrics = {
"timestamp": datetime.now().isoformat(),
"filename": filename,
"instrument_id": -1,
"n_events": 0,
"load_time": 0.0,
"engine_time": 0.0,
"save_time": 0.0,
"total_time": 0.0,
"status": "SUCCESS",
"error": "",
}
# Context container for cleanup
ctx = {"loader": None, "data": None, "sim": None}
try:
# 1. Load Data
t0 = time.time()
ctx["loader"] = DataLoader(
path,
start_time=config.start_time,
end_time=config.end_time,
timezone=config.timezone,
)
ctx["data"] = ctx["loader"].load()
metrics["load_time"] = time.time() - t0
metrics["instrument_id"] = ctx["data"].instrument_id
metrics["n_events"] = ctx["data"].n_events
log_time(
"worker_load",
metrics["load_time"],
logger=tlog,
n_events=metrics["n_events"],
)
print(
f"[{i + 1}/{total_days}] Loaded {filename}: {ctx['data'].n_events} events."
)
# 2. Run Engine
t0 = time.time()
ctx["sim"] = SingleDaySimulation(ctx["data"])
results = ctx["sim"].run(config.features, config.target_s)
metrics["engine_time"] = time.time() - t0
log_time("worker_engine", metrics["engine_time"], logger=tlog)
# 3. Save
if output_dir:
t0 = time.time()
out_path = _get_output_path(output_dir, filename)
Storage.save_parquet(pd.DataFrame(results), out_path)
metrics["save_time"] = time.time() - t0
log_time("worker_save", metrics["save_time"], logger=tlog)
print(f"[{i + 1}/{total_days}] Saved to {out_path}")
except Exception as e:
metrics["status"] = "FAILED"
metrics["error"] = f"{type(e).__name__}: {str(e)}"
print(f"FAILED {filename}: {metrics['error']}")
# traceback.print_exc() # Optional: keep verbose off?
finally:
# Explicit Cleanup
ctx.clear()
results = None # Clear results ref
gc.collect()
metrics["total_time"] = (
metrics["load_time"] + metrics["engine_time"] + metrics["save_time"]
)
log_time(
"worker_task",
metrics["total_time"],
logger=tlog,
n_events=metrics["n_events"],
load_s=round(metrics["load_time"], 4),
engine_s=round(metrics["engine_time"], 4),
save_s=round(metrics["save_time"], 4),
status=metrics["status"],
)
row = _finalize_metrics(metrics)
# Write to benchmark file (mostly for Sequential LOCAL runs)
if benchmark_file:
_append_benchmark(benchmark_file, row)
return row
def _get_output_path(base_dir: str, filename: str) -> str:
"""Helper to determine output path from input filename."""
# Logic: 20240101.parquet
date_match = re.search(r"(\d{8})", filename)
out_base = date_match.group(1) if date_match else filename
out_name = f"{out_base}.parquet"
if base_dir.startswith("r2://"):
return f"{base_dir.rstrip('/')}/{out_name}"
return os.path.join(base_dir, out_name)
def _finalize_metrics(m: Dict) -> List:
"""Converts metrics dict to standardized list row."""
# Ensure order matches schema
return [
m["timestamp"],
m["filename"],
m["instrument_id"],
m["n_events"],
f"{m['load_time']:.4f}",
f"{m['engine_time']:.4f}",
f"{m['save_time']:.4f}",
f"{m['total_time']:.4f}",
m["status"],
m["error"],
]
def _append_benchmark(file_path: str, row: List):
try:
import csv
with open(file_path, "a", newline="") as f:
csv.writer(f).writerow(row)
except Exception as e:
print(f"Warning: Failed to append benchmark to {file_path}: {e}")
pass