donkeycar-rl-autoresearch/agent/evaluate_champion.py

170 lines
5.9 KiB
Python

"""
Champion Model Evaluator
========================
Loads the champion model and runs it live in the simulator for visual inspection.
Prints per-step diagnostics: position, speed, CTE, efficiency, reward.
Usage:
python3 evaluate_champion.py [--episodes N] [--steps N]
Watch the simulator window to see if the car is genuinely driving the track
or exploiting circular motion.
"""
import os
import sys
import time
import json
import numpy as np
from collections import deque
import gymnasium as gym
import gym_donkeycar
from stable_baselines3 import PPO
# Add agent dir to path for wrappers
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from reward_wrapper import SpeedRewardWrapper
from donkeycar_sb3_runner import ThrottleClampWrapper
CHAMPION_DIR = os.path.join(os.path.dirname(__file__), 'models', 'champion')
MANIFEST_PATH = os.path.join(CHAMPION_DIR, 'manifest.json')
MODEL_PATH = os.path.join(CHAMPION_DIR, 'model.zip')
def load_manifest():
with open(MANIFEST_PATH) as f:
return json.load(f)
def print_banner(manifest):
print('=' * 65, flush=True)
print('🏆 DonkeyCar Champion Model Evaluation', flush=True)
print('=' * 65, flush=True)
print(f" Trial: {manifest['trial']}", flush=True)
print(f" mean_reward: {manifest['mean_reward']:.4f}", flush=True)
print(f" Params: {manifest['params']}", flush=True)
print(f" Model: {MODEL_PATH}", flush=True)
print('=' * 65, flush=True)
print(flush=True)
def compute_efficiency(pos_history):
"""Path efficiency = net_displacement / total_path_length over window."""
if len(pos_history) < 3:
return 1.0
positions = list(pos_history)
net = np.linalg.norm(np.array(positions[-1]) - np.array(positions[0]))
total = sum(
np.linalg.norm(np.array(positions[i+1]) - np.array(positions[i]))
for i in range(len(positions)-1)
)
return float(net / total) if total > 1e-6 else 1.0
def run_episode(model, env, episode_num, max_steps=500):
"""Run one episode with the champion policy, printing diagnostics."""
print(f'\n--- Episode {episode_num} ---', flush=True)
obs, info = env.reset()
pos_history = deque(maxlen=30)
total_reward = 0.0
step = 0
print(f'{"Step":>5} {"Speed":>6} {"CTE":>7} {"Eff%":>6} {"Rwd":>8} {"TotRwd":>10} {"Pos_x":>8} {"Pos_z":>8}', flush=True)
print('-' * 65, flush=True)
while step < max_steps:
action, _ = model.predict(obs, deterministic=True)
result = env.step(action)
if len(result) == 5:
obs, reward, terminated, truncated, info = result
done = terminated or truncated
else:
obs, reward, done, info = result
# Extract diagnostics from info
speed = float(info.get('speed', 0.0) or 0.0)
cte = float(info.get('cte', 0.0) or 0.0)
pos = info.get('pos', None)
if pos is not None:
pos_history.append(list(pos)[:3])
px, pz = pos[0], pos[2] if len(pos) > 2 else 0.0
else:
px, pz = 0.0, 0.0
efficiency = compute_efficiency(pos_history)
total_reward += reward
step += 1
# Print every 10 steps or on done
if step % 10 == 0 or done:
print(f'{step:>5} {speed:>6.2f} {cte:>7.3f} {efficiency*100:>5.1f}% {reward:>8.3f} {total_reward:>10.2f} {px:>8.2f} {pz:>8.2f}', flush=True)
if done:
print(f'\n ✅ Episode {episode_num} done after {step} steps | total_reward={total_reward:.2f}', flush=True)
break
if step >= max_steps:
print(f'\n ⏱️ Episode {episode_num} reached max_steps={max_steps} | total_reward={total_reward:.2f}', flush=True)
return total_reward, step
def main(episodes=3, max_steps=500):
manifest = load_manifest()
print_banner(manifest)
params = manifest['params']
print(f'[Eval] Connecting to simulator...', flush=True)
try:
env = gym.make('donkey-generated-roads-v0')
except Exception as e:
print(f'[Eval] FAILED to connect: {e}', flush=True)
sys.exit(1)
# Apply same wrappers as training
env = ThrottleClampWrapper(env, throttle_min=0.2)
env = SpeedRewardWrapper(env, speed_scale=0.1)
print(f'[Eval] Wrappers applied: ThrottleClamp(min=0.2), SpeedRewardWrapper(scale=0.1)', flush=True)
print(f'[Eval] Loading champion model from {MODEL_PATH}...', flush=True)
try:
model = PPO.load(MODEL_PATH, env=env)
print(f'[Eval] Model loaded successfully.', flush=True)
except Exception as e:
print(f'[Eval] FAILED to load model: {e}', flush=True)
env.close()
sys.exit(1)
print(f'\n[Eval] Running {episodes} episodes (max {max_steps} steps each)...', flush=True)
print('[Eval] Watch the simulator window — is the car driving the track or circling?', flush=True)
all_rewards = []
for ep in range(1, episodes + 1):
total_reward, steps = run_episode(model, env, ep, max_steps=max_steps)
all_rewards.append(total_reward)
if ep < episodes:
time.sleep(2) # Brief pause between episodes
print('\n' + '=' * 65, flush=True)
print('📊 Evaluation Complete', flush=True)
print(f' Episodes: {episodes}', flush=True)
print(f' Rewards: {[f"{r:.1f}" for r in all_rewards]}', flush=True)
print(f' Mean reward: {sum(all_rewards)/len(all_rewards):.2f}', flush=True)
print(f' Std reward: {float(np.std(all_rewards)):.2f}', flush=True)
print('=' * 65, flush=True)
env.close()
time.sleep(2)
print('[Eval] Done.', flush=True)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--episodes', type=int, default=3, help='Number of eval episodes')
parser.add_argument('--steps', type=int, default=500, help='Max steps per episode')
args = parser.parse_args()
main(episodes=args.episodes, max_steps=args.steps)