170 lines
5.9 KiB
Python
170 lines
5.9 KiB
Python
"""
|
|
Champion Model Evaluator
|
|
========================
|
|
Loads the champion model and runs it live in the simulator for visual inspection.
|
|
Prints per-step diagnostics: position, speed, CTE, efficiency, reward.
|
|
|
|
Usage:
|
|
python3 evaluate_champion.py [--episodes N] [--steps N]
|
|
|
|
Watch the simulator window to see if the car is genuinely driving the track
|
|
or exploiting circular motion.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import numpy as np
|
|
from collections import deque
|
|
|
|
import gymnasium as gym
|
|
import gym_donkeycar
|
|
from stable_baselines3 import PPO
|
|
|
|
# Add agent dir to path for wrappers
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from reward_wrapper import SpeedRewardWrapper
|
|
from donkeycar_sb3_runner import ThrottleClampWrapper
|
|
|
|
CHAMPION_DIR = os.path.join(os.path.dirname(__file__), 'models', 'champion')
|
|
MANIFEST_PATH = os.path.join(CHAMPION_DIR, 'manifest.json')
|
|
MODEL_PATH = os.path.join(CHAMPION_DIR, 'model.zip')
|
|
|
|
|
|
def load_manifest():
|
|
with open(MANIFEST_PATH) as f:
|
|
return json.load(f)
|
|
|
|
|
|
def print_banner(manifest):
|
|
print('=' * 65, flush=True)
|
|
print('🏆 DonkeyCar Champion Model Evaluation', flush=True)
|
|
print('=' * 65, flush=True)
|
|
print(f" Trial: {manifest['trial']}", flush=True)
|
|
print(f" mean_reward: {manifest['mean_reward']:.4f}", flush=True)
|
|
print(f" Params: {manifest['params']}", flush=True)
|
|
print(f" Model: {MODEL_PATH}", flush=True)
|
|
print('=' * 65, flush=True)
|
|
print(flush=True)
|
|
|
|
|
|
def compute_efficiency(pos_history):
|
|
"""Path efficiency = net_displacement / total_path_length over window."""
|
|
if len(pos_history) < 3:
|
|
return 1.0
|
|
positions = list(pos_history)
|
|
net = np.linalg.norm(np.array(positions[-1]) - np.array(positions[0]))
|
|
total = sum(
|
|
np.linalg.norm(np.array(positions[i+1]) - np.array(positions[i]))
|
|
for i in range(len(positions)-1)
|
|
)
|
|
return float(net / total) if total > 1e-6 else 1.0
|
|
|
|
|
|
def run_episode(model, env, episode_num, max_steps=500):
|
|
"""Run one episode with the champion policy, printing diagnostics."""
|
|
print(f'\n--- Episode {episode_num} ---', flush=True)
|
|
obs, info = env.reset()
|
|
pos_history = deque(maxlen=30)
|
|
total_reward = 0.0
|
|
step = 0
|
|
|
|
print(f'{"Step":>5} {"Speed":>6} {"CTE":>7} {"Eff%":>6} {"Rwd":>8} {"TotRwd":>10} {"Pos_x":>8} {"Pos_z":>8}', flush=True)
|
|
print('-' * 65, flush=True)
|
|
|
|
while step < max_steps:
|
|
action, _ = model.predict(obs, deterministic=True)
|
|
result = env.step(action)
|
|
if len(result) == 5:
|
|
obs, reward, terminated, truncated, info = result
|
|
done = terminated or truncated
|
|
else:
|
|
obs, reward, done, info = result
|
|
|
|
# Extract diagnostics from info
|
|
speed = float(info.get('speed', 0.0) or 0.0)
|
|
cte = float(info.get('cte', 0.0) or 0.0)
|
|
pos = info.get('pos', None)
|
|
if pos is not None:
|
|
pos_history.append(list(pos)[:3])
|
|
px, pz = pos[0], pos[2] if len(pos) > 2 else 0.0
|
|
else:
|
|
px, pz = 0.0, 0.0
|
|
|
|
efficiency = compute_efficiency(pos_history)
|
|
total_reward += reward
|
|
step += 1
|
|
|
|
# Print every 10 steps or on done
|
|
if step % 10 == 0 or done:
|
|
print(f'{step:>5} {speed:>6.2f} {cte:>7.3f} {efficiency*100:>5.1f}% {reward:>8.3f} {total_reward:>10.2f} {px:>8.2f} {pz:>8.2f}', flush=True)
|
|
|
|
if done:
|
|
print(f'\n ✅ Episode {episode_num} done after {step} steps | total_reward={total_reward:.2f}', flush=True)
|
|
break
|
|
|
|
if step >= max_steps:
|
|
print(f'\n ⏱️ Episode {episode_num} reached max_steps={max_steps} | total_reward={total_reward:.2f}', flush=True)
|
|
|
|
return total_reward, step
|
|
|
|
|
|
def main(episodes=3, max_steps=500):
|
|
manifest = load_manifest()
|
|
print_banner(manifest)
|
|
|
|
params = manifest['params']
|
|
|
|
print(f'[Eval] Connecting to simulator...', flush=True)
|
|
try:
|
|
env = gym.make('donkey-generated-roads-v0')
|
|
except Exception as e:
|
|
print(f'[Eval] FAILED to connect: {e}', flush=True)
|
|
sys.exit(1)
|
|
|
|
# Apply same wrappers as training
|
|
env = ThrottleClampWrapper(env, throttle_min=0.2)
|
|
env = SpeedRewardWrapper(env, speed_scale=0.1)
|
|
print(f'[Eval] Wrappers applied: ThrottleClamp(min=0.2), SpeedRewardWrapper(scale=0.1)', flush=True)
|
|
|
|
print(f'[Eval] Loading champion model from {MODEL_PATH}...', flush=True)
|
|
try:
|
|
model = PPO.load(MODEL_PATH, env=env)
|
|
print(f'[Eval] Model loaded successfully.', flush=True)
|
|
except Exception as e:
|
|
print(f'[Eval] FAILED to load model: {e}', flush=True)
|
|
env.close()
|
|
sys.exit(1)
|
|
|
|
print(f'\n[Eval] Running {episodes} episodes (max {max_steps} steps each)...', flush=True)
|
|
print('[Eval] Watch the simulator window — is the car driving the track or circling?', flush=True)
|
|
|
|
all_rewards = []
|
|
for ep in range(1, episodes + 1):
|
|
total_reward, steps = run_episode(model, env, ep, max_steps=max_steps)
|
|
all_rewards.append(total_reward)
|
|
if ep < episodes:
|
|
time.sleep(2) # Brief pause between episodes
|
|
|
|
print('\n' + '=' * 65, flush=True)
|
|
print('📊 Evaluation Complete', flush=True)
|
|
print(f' Episodes: {episodes}', flush=True)
|
|
print(f' Rewards: {[f"{r:.1f}" for r in all_rewards]}', flush=True)
|
|
print(f' Mean reward: {sum(all_rewards)/len(all_rewards):.2f}', flush=True)
|
|
print(f' Std reward: {float(np.std(all_rewards)):.2f}', flush=True)
|
|
print('=' * 65, flush=True)
|
|
|
|
env.close()
|
|
time.sleep(2)
|
|
print('[Eval] Done.', flush=True)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--episodes', type=int, default=3, help='Number of eval episodes')
|
|
parser.add_argument('--steps', type=int, default=500, help='Max steps per episode')
|
|
args = parser.parse_args()
|
|
main(episodes=args.episodes, max_steps=args.steps)
|