import os, json, time from datetime import datetime from stable_baselines3 import PPO from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage import gymnasium as gym import numpy as np from donkeycar_sb3_runner import ThrottleClampWrapper HOST='10.0.0.55' PORT=9091 TRACK_ID='donkey-mountain-track-v0' MAX_STEPS=2000 EPISODES=9 OUT='outerloop-results/mountain_candidate_eval_2026-04-19.jsonl' CANDIDATES = [ ('exp14_base', 'models/exp14-mountain-v5/best_model.zip', 0.2), ('ft_006k', 'models/exp14-mountain-v5-finetune/checkpoint_0006000.zip', 0.4), ('ft_024k', 'models/exp14-mountain-v5-finetune/checkpoint_0024000.zip', 0.4), ('ft_030k', 'models/exp14-mountain-v5-finetune/checkpoint_0030000.zip', 0.4), ('ft_036k', 'models/exp14-mountain-v5-finetune/checkpoint_0036000.zip', 0.2), ('ft_042k', 'models/exp14-mountain-v5-finetune/checkpoint_0042000.zip', 0.2), ('ft_048k', 'models/exp14-mountain-v5-finetune/checkpoint_0048000.zip', 0.2), ] class V5RewardWrapper(gym.Wrapper): def __init__(self, env, max_cte=8.0, min_lap_time=5.0): super().__init__(env) self.max_cte = max_cte self.min_lap_time = min_lap_time self._last_lc = 0 def reset(self, **kwargs): self._last_lc = 0 return self.env.reset(**kwargs) def step(self, action): result = self.env.step(action) if len(result) == 5: obs, _sim, terminated, truncated, info = result done = terminated or truncated else: obs, _sim, done, info = result terminated, truncated = done, False try: cte = float(info.get('cte', 0.0) or 0.0) except Exception: cte = 0.0 cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0) try: speed = max(0.0, float(info.get('speed', 0.0) or 0.0)) except Exception: speed = 0.0 speed_norm = min(speed / 10.0, 1.0) reward = cte_quality * speed_norm try: current_lc = int(info.get('lap_count', 0) or 0) except Exception: current_lc = self._last_lc force_terminate = False if current_lc > self._last_lc: self._last_lc = current_lc try: lap_time = float(info.get('last_lap_time', 999.0) or 999.0) except Exception: lap_time = 999.0 if lap_time < self.min_lap_time: reward = -10.0 * (self.min_lap_time / max(lap_time, 0.1)) force_terminate = True if len(result) == 5: return obs, reward, terminated or force_terminate, truncated, info return obs, reward, terminated or force_terminate, info def make_env(base_throttle=0.2, throttle_floor=None): def _init(): raw = gym.make(TRACK_ID, conf={'host': HOST, 'port': PORT}) env = ThrottleClampWrapper(raw, throttle_min=base_throttle) if throttle_floor is not None: class ThrottleFloorWrapper(gym.Wrapper): def __init__(self, env, floor): super().__init__(env) self.floor = floor def step(self, action): act = np.array(action) try: act[1] = max(act[1], self.floor) except Exception: pass return self.env.step(act) def reset(self, **kwargs): return self.env.reset(**kwargs) env = ThrottleFloorWrapper(env, throttle_floor) env = V5RewardWrapper(env) return env return _init os.makedirs(os.path.dirname(OUT), exist_ok=True) all_rows = [] for label, model_path, floor in CANDIDATES: print(f'\n=== Evaluating {label} floor={floor} path={model_path}', flush=True) env = VecTransposeImage(DummyVecEnv([make_env(0.2, floor)])) model = PPO.load(model_path, device='cpu') model.set_env(env) episodes = [] for ep in range(EPISODES): obs = env.reset() steps = 0 laps = 0 prev_lc = 0 lap_times = [] total_reward = 0.0 while steps < MAX_STEPS: action, _ = model.predict(obs, deterministic=True) obs, r, d, info = env.step(action) inf = info[0] if isinstance(info, (list, tuple)) else info total_reward += float(r[0]) steps += 1 lc = int(inf.get('lap_count', 0) or 0) if lc > prev_lc: try: lap_times.append(float(inf.get('last_lap_time', 0) or 0)) except Exception: lap_times.append(0.0) prev_lc = lc laps = lc if bool(d[0]): break row = { 'label': label, 'model_path': model_path, 'throttle_floor': floor, 'episode': ep + 1, 'steps': steps, 'laps': laps, 'lap_times': lap_times, 'reward': total_reward, } episodes.append(row) all_rows.append(row) print(f" ep{ep+1}: steps={steps} laps={laps} lap_times={lap_times}", flush=True) env.close() time.sleep(2) with open(OUT, 'w') as f: for row in all_rows: f.write(json.dumps(row) + '\n') print('\nSaved to', OUT)