""" run_eval.py — Standard evaluation runner for any saved model. Usage: python3 run_eval.py --model models/exp9-mountain-v5-throttle02/best_model.zip \ --sets 3 --steps 2000 Saves results to: agent/test-results/YYYY-MM-DD_HH-MM_.log Also prints to terminal in real time. """ import sys, os, time, argparse sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from multitrack_runner import log as _log, _send_exit_scene, StuckTerminationWrapper from donkeycar_sb3_runner import ThrottleClampWrapper from reward_wrapper import SpeedRewardWrapper from stable_baselines3 import PPO from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage import gymnasium as gym, numpy as np from datetime import datetime TRACKS = [ ('donkey-mountain-track-v0', 'mountain_track'), ('donkey-generated-track-v0', 'generated_track'), ('donkey-generated-roads-v0', 'generated_road'), ('donkey-minimonaco-track-v0','mini_monaco'), ] parser = argparse.ArgumentParser() parser.add_argument('--model', required=True) parser.add_argument('--sets', type=int, default=3) parser.add_argument('--steps', type=int, default=2000) parser.add_argument('--throttle', type=float, default=None, help='Override throttle_min (default: read from model action space)') args = parser.parse_args() # Log file ts = datetime.now().strftime('%Y-%m-%d_%H-%M') name = os.path.basename(os.path.dirname(args.model)) log_path = os.path.join(os.path.dirname(__file__), 'test-results', f'{ts}_{name}.log') os.makedirs(os.path.dirname(log_path), exist_ok=True) _lf = open(log_path, 'w', buffering=1) def log(msg): ts2 = datetime.now().strftime('%H:%M:%S') line = f'[{ts2}] {msg}' print(line, flush=True) _lf.write(line + '\n') log(f'Model: {args.model}') log(f'Sets: {args.sets}') log(f'Max steps:{args.steps}') log(f'Log file: {log_path}') def make_env(track_id, throttle_min): raw = gym.make(track_id) env = ThrottleClampWrapper(raw, throttle_min=throttle_min) env = StuckTerminationWrapper(env, stuck_steps=80, min_displacement=0.5) env = SpeedRewardWrapper(env) return env all_results = {name: [] for _, name in TRACKS} current = 'donkey-generated-roads-v0' # Detect throttle_min from model action space if not overridden _throttle_min = args.throttle if args.throttle is not None else 0.2 # default for set_num in range(1, args.sets + 1): log(f'\n{"="*50}') log(f'SET {set_num} of {args.sets}') log(f'{"="*50}') for track_id, track_name in TRACKS: tmp = gym.make(current); time.sleep(2) _send_exit_scene(tmp, verbose=False); tmp.close(); time.sleep(5) env = VecTransposeImage(DummyVecEnv( [lambda t=track_id, tm=_throttle_min: make_env(t, tm)])) model = PPO.load(args.model, env=env, device='cpu') obs = env.reset(); total, steps, done = 0.0, 0, False while not done and steps < args.steps: action, _ = model.predict(obs, deterministic=True) result = env.step(action) if len(result)==5: obs,r,t,tr,info=result; done=bool(t[0] or tr[0]) else: obs,r,d,info=result; done=bool(d[0]) total+=float(r[0]); steps+=1 status = '✅ FULL' if steps>=args.steps else f'❌ crash@{steps}' log(f' Set{set_num} {track_name:20s}: {steps:4d} steps {total:7.1f} reward {status}') all_results[track_name].append(steps) env.close(); time.sleep(2) current = track_id log(f'\n{"="*50}') log(f'SUMMARY ({args.sets} sets, max {args.steps} steps per run)') log(f'{"="*50}') for _, track_name in TRACKS: r = all_results[track_name] icon = '✅' if min(r) >= args.steps else ('⚠️' if np.mean(r) >= 500 else '❌') log(f' {icon} {track_name:20s}: {"/".join(str(x) for x in r)} mean={np.mean(r):.0f}') log(f'\nFull log saved to: {log_path}') _lf.close()