donkeycar-rl-autoresearch/agent/experiments/eval_gentrack_on_minimonaco.py

196 lines
6.4 KiB
Python

"""
eval_gentrack_on_minimonaco.py
Evaluate generated-track specialist models on mini-monaco (zero-shot).
Key question: does a model trained on generated-track generalize to
mini-monaco, given that both tracks are visually very similar?
Models tested:
- exp13-gentrack-v4/best_model.zip (30k steps, clean gentrack specialist)
- wave5-gentrack-only/model.zip (90k steps, gentrack from scratch)
- wave4-trial-0009/model.zip (the one run that drove mini-monaco)
Track: donkey-minimonaco-track-v0 (never seen during any of these trainings)
Episodes: 7 per model
Max steps: 2000 per episode
"""
import sys, os, time
from datetime import datetime
import numpy as np
sys.path.insert(0, '/home/paulh/projects/donkeycar-rl-autoresearch/agent')
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage
from donkeycar_sb3_runner import ThrottleClampWrapper
import gymnasium as gym
HOST = 'localhost'
PORT = 9091
TRACK_ID = 'donkey-minimonaco-track-v0'
EPISODES = 7
MAX_STEPS = 3000 # enough for 2+ laps
THROTTLE_MIN = 0.2
STUCK_STEPS = 60 # terminate if car hasn't moved in this many steps
STUCK_DIST = 0.3 # minimum displacement (metres) to not be considered stuck
BASE = '/home/paulh/projects/donkeycar-rl-autoresearch/agent/models'
MODELS = [
('exp13-gentrack-v4', f'{BASE}/exp13-gentrack-v4/best_model.zip'),
('wave5-gentrack-only', f'{BASE}/wave5-gentrack-only/model.zip'),
('wave4-trial-0009', f'{BASE}/wave4-trial-0009/model.zip'),
]
# Log to file + stdout
log_path = os.path.join(
BASE,
f'eval_gentrack_minimonaco_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
)
_logfile = open(log_path, 'w', buffering=1)
def log(msg):
ts = datetime.now().strftime('%H:%M:%S')
line = f'[{ts}] {msg}'
print(line, flush=True)
_logfile.write(line + '\n')
class MiniMonacoWrapper(gym.Wrapper):
"""Two fixes for mini-monaco evaluation:
1. Suppress starting_line termination until lap_count >= 1 (car spawns
just before the line; the first crossing is not a lap completion).
2. Terminate if the car hasn't moved STUCK_DIST metres in STUCK_STEPS steps.
"""
def reset(self, **kwargs):
self._lap_count = 0
self._pos_history = []
return self.env.reset(**kwargs)
def step(self, action):
obs, reward, terminated, truncated, info = self.env.step(action)
laps = int(info.get('lap_count', 0) or 0)
if laps > self._lap_count:
self._lap_count = laps
# Suppress initial starting_line crossing
if terminated and info.get('hit') == 'starting_line' and self._lap_count < 1:
terminated = False
reward = 0.0
# Stuck detection
pos = info.get('pos')
if pos is not None:
self._pos_history.append(np.array(list(pos)[:3]))
if len(self._pos_history) > STUCK_STEPS:
self._pos_history.pop(0)
if len(self._pos_history) == STUCK_STEPS:
displacement = np.linalg.norm(
self._pos_history[-1] - self._pos_history[0])
if displacement < STUCK_DIST:
terminated = True
reward = -1.0
info['hit'] = 'stuck'
return obs, reward, terminated, truncated, info
def make_env():
raw = gym.make(TRACK_ID, conf={'host': HOST, 'port': PORT})
env = ThrottleClampWrapper(raw, throttle_min=THROTTLE_MIN)
env = MiniMonacoWrapper(env)
return env
def run_eval(model_label, model_path):
log('')
log(f'── {model_label} ──────────────────────────────────────')
log(f' Model: {model_path}')
if not os.path.exists(model_path):
log(f' ERROR: model file not found — skipping')
return None
env = VecTransposeImage(DummyVecEnv([make_env]))
try:
model = PPO.load(model_path, env=env, device='cpu')
except Exception as e:
log(f' ERROR loading model: {e}')
env.close()
return None
rewards, steps_list, laps_list = [], [], []
for ep in range(1, EPISODES + 1):
obs = env.reset()
total_r, steps, done = 0.0, 0, False
laps = 0
while not done and steps < MAX_STEPS:
action, _ = model.predict(obs, deterministic=True)
obs, r, d, info = env.step(action)
total_r += float(r[0])
steps += 1
done = bool(d[0])
raw_info = info[0] if isinstance(info, (list, tuple)) else info
laps = int((raw_info.get('lap_count', 0) or 0))
hit = (info[0] if isinstance(info, (list, tuple)) else info).get('hit', '?')
if steps >= MAX_STEPS:
status = f'✅ timeout ({laps} laps)'
elif hit == 'stuck':
status = f'❌ STUCK @{steps} ({laps} laps)'
else:
status = f'❌ crash @{steps} hit={hit} ({laps} laps)'
log(f' ep{ep}: {total_r:.1f}r / {steps}s {status}')
rewards.append(total_r)
steps_list.append(steps)
laps_list.append(laps)
time.sleep(0.3)
mean_r = np.mean(rewards)
mean_s = np.mean(steps_list)
total_laps = sum(laps_list)
lapped = sum(1 for l in laps_list if l >= 1)
log(f' SUMMARY: {lapped}/{EPISODES} completed a lap | '
f'total laps={total_laps} | mean {mean_s:.0f}s / {mean_r:.1f}r')
env.close()
time.sleep(2)
return {'label': model_label, 'lapped': lapped, 'total_laps': total_laps,
'mean_steps': mean_s, 'mean_reward': mean_r}
def main():
log('=' * 70)
log('Eval: generated-track specialists on mini-monaco (zero-shot)')
log(f'Track : {TRACK_ID}')
log(f'Episodes: {EPISODES} x max {MAX_STEPS} steps')
log(f'Host : {HOST}:{PORT}')
log(f'Log : {log_path}')
log('=' * 70)
results = []
for label, path in MODELS:
r = run_eval(label, path)
if r:
results.append(r)
log('')
log('=' * 70)
log('FINAL RESULTS')
log('=' * 70)
for r in sorted(results, key=lambda x: -x['total_laps']):
log(f" {r['label']:<25} lapped={r['lapped']}/{EPISODES} "
f"total_laps={r['total_laps']} mean {r['mean_steps']:>5.0f}s / {r['mean_reward']:>6.1f}r")
log(f'\nLog saved: {log_path}')
_logfile.close()
if __name__ == '__main__':
main()