donkeycar-rl-autoresearch/agent/experiments/exp16_mountain_from_gentrac...

195 lines
6.2 KiB
Python

"""
Exp 16: Warm-start mountain_track from the generated_track champion.
Goal:
- Test reverse transfer cleanly using a single-track setup.
- Warm-start from generated champion:
agent/models/exp13-gentrack-v4/best_model.zip
- Train on mountain_track only using the known-good Exp 14 v5 setup.
Caveat:
- Mountain may still be affected by Unity traction/material issues, so results
should be interpreted with that in mind.
"""
import sys, os, time
sys.path.insert(0, '/home/paulh/projects/donkeycar-rl-autoresearch/agent')
from donkeycar_sb3_runner import ThrottleClampWrapper
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage
import gymnasium as gym
import numpy as np
from datetime import datetime
HOST = '10.0.0.55'
PORT = 9091
TRACK_ID = 'donkey-mountain-track-v0'
TRACK_NAME = 'mountain_track'
THROTTLE_MIN = 0.2
LR = 0.0004
MAX_STEPS = 300000
EVAL_EVERY = 5000
LAP_STOP = 3
WARM_PATH = '/home/paulh/projects/donkeycar-rl-autoresearch/agent/models/exp13-gentrack-v4/best_model.zip'
SAVE_DIR = '/home/paulh/projects/donkeycar-rl-autoresearch/agent/models/exp16-mountain-from-gentrack'
os.makedirs(SAVE_DIR, exist_ok=True)
def log(msg):
print(f'[{datetime.now().strftime("%H:%M:%S")}] {msg}', flush=True)
class V5RewardWrapper(gym.Wrapper):
def __init__(self, env, max_cte=8.0, min_lap_time=5.0):
super().__init__(env)
self.max_cte = max_cte
self.min_lap_time = min_lap_time
self._last_lc = 0
def reset(self, **kwargs):
self._last_lc = 0
return self.env.reset(**kwargs)
def step(self, action):
result = self.env.step(action)
if len(result) == 5:
obs, _r, terminated, truncated, info = result
else:
obs, _r, done, info = result
terminated, truncated = done, False
reward, force_term = self._compute(info, terminated or truncated)
if force_term:
terminated = True
if len(result) == 5:
return obs, reward, terminated, truncated, info
return obs, reward, terminated or truncated, info
def _compute(self, info, done):
if done:
return -1.0, False
try:
lc = int(info.get('lap_count', 0) or 0)
except (TypeError, ValueError):
lc = self._last_lc
if lc > self._last_lc:
self._last_lc = lc
try:
lt = float(info.get('last_lap_time', 999) or 999)
except (TypeError, ValueError):
lt = 999
if lt < self.min_lap_time:
penalty = -10.0 * (self.min_lap_time / max(lt, 0.1))
return penalty, True
try:
cte = float(info.get('cte', 0) or 0)
except (TypeError, ValueError):
cte = 0.0
cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0)
try:
speed = max(0.0, float(info.get('speed', 0) or 0))
except (TypeError, ValueError):
speed = 0.0
speed_norm = min(speed / 10.0, 1.0)
return cte_quality * speed_norm, False
def make_env():
def _init():
raw = gym.make(TRACK_ID, conf={'host': HOST, 'port': PORT})
env = ThrottleClampWrapper(raw, throttle_min=THROTTLE_MIN)
env = V5RewardWrapper(env)
return env
return _init
log('='*60)
log(f'Exp 16: {TRACK_NAME} warm-start from generated champion')
log(f' Host: {HOST}:{PORT}')
log(f' Warm start: {WARM_PATH}')
log(f' throttle_min={THROTTLE_MIN}, lr={LR}')
log(f' Reward: v5 (Exp 14 known-good mountain setup)')
log(f' Stop: eval every {EVAL_EVERY:,} steps, stop at {LAP_STOP} laps')
log('='*60)
# scene switch first
log('Switching sim to mountain_track...')
_tmp = gym.make('donkey-generated-track-v0', conf={'host': HOST, 'port': PORT})
time.sleep(2)
try:
_tmp.unwrapped.viewer.exit_scene()
time.sleep(0.5)
except Exception as e:
log(f' exit_scene warning: {e}')
_tmp.close()
time.sleep(6)
log('Sim should now be at main menu. Connecting to mountain_track...')
env = VecTransposeImage(DummyVecEnv([make_env()]))
if os.path.exists(WARM_PATH):
model = PPO.load(WARM_PATH, device='cpu')
model.set_env(env)
model.learning_rate = LR
try:
for pg in model.policy.optimizer.param_groups:
pg['lr'] = LR
except Exception:
pass
log('Loaded warm-start model and attached mountain env')
else:
raise FileNotFoundError(WARM_PATH)
best_reward = float('-inf')
best_laps = 0
steps_done = 0
while steps_done < MAX_STEPS:
seg = min(EVAL_EVERY, MAX_STEPS - steps_done)
model.learn(total_timesteps=seg, reset_num_timesteps=False)
steps_done += seg
ckpt = os.path.join(SAVE_DIR, f'checkpoint_{steps_done:07d}')
model.save(ckpt)
model.save(os.path.join(SAVE_DIR, 'model'))
try:
obs = env.reset()
ep_r = 0.0
ep_s = 0
laps = 0
prev_lc = 0
for _ in range(2000):
action, _ = model.predict(obs, deterministic=True)
obs, r, d, info = env.step(action)
ep_r += float(r[0])
ep_s += 1
try:
lc = int((info[0] if isinstance(info, (list,tuple)) else info).get('lap_count', 0) or 0)
if lc > prev_lc:
laps = lc
prev_lc = lc
except Exception:
pass
if bool(d[0]):
break
status = '' if ep_s >= 2000 else f'❌@{ep_s}'
log(f'[{steps_done:,}] reward={ep_r:.1f} steps={ep_s} laps={laps} {status}')
if ep_r > best_reward:
best_reward = ep_r
model.save(os.path.join(SAVE_DIR, 'best_model'))
log(f' ⭐ NEW BEST: {best_reward:.1f}')
if laps > best_laps:
best_laps = laps
log(f' 🏆 BEST LAPS: {best_laps}')
if laps >= LAP_STOP:
log(f' 🎯 {laps} laps at {steps_done:,} steps — STOPPING')
break
except Exception as e:
log(f' Eval error: {e}')
import traceback; traceback.print_exc()
env.close()
time.sleep(3)
log(f'\nDone. best_laps={best_laps} best_reward={best_reward:.1f}')
log(f'Best model: {SAVE_DIR}/best_model.zip')
log('=== Exp 16 COMPLETE ===')