donkeycar-rl-autoresearch/agent/experiments/mountain_v5.py

80 lines
3.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys, os, time
sys.path.insert(0, '/home/paulh/projects/donkeycar-rl-autoresearch/agent')
from multitrack_runner import log, _send_exit_scene, StuckTerminationWrapper
from donkeycar_sb3_runner import ThrottleClampWrapper
from reward_wrapper import SpeedRewardWrapper
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage
from stable_baselines3.common.callbacks import BaseCallback
import gymnasium as gym
THROTTLE_MIN = 0.5
LR = 0.000725
TOTAL_STEPS = 90000
SAVE_PATH = '/home/paulh/projects/donkeycar-rl-autoresearch/agent/models/exp5-mountain-v5reward/model'
os.makedirs(os.path.dirname(SAVE_PATH), exist_ok=True)
def make_env(env_id):
raw = gym.make(env_id)
env = ThrottleClampWrapper(raw, throttle_min=THROTTLE_MIN)
env = StuckTerminationWrapper(env, stuck_steps=80, min_displacement=0.5)
env = SpeedRewardWrapper(env) # v5 reward
return env
def switch_to(current_id, next_id, name):
log(f'{name}...')
tmp = gym.make(current_id); time.sleep(2)
_send_exit_scene(tmp, verbose=False); tmp.close(); time.sleep(5)
env = VecTransposeImage(DummyVecEnv([lambda: make_env(next_id)]))
log(f' Connected to {name}'); return env
class ProgressCB(BaseCallback):
def __init__(self, total):
super().__init__(verbose=0); self._last=0; self._total=total
def _on_step(self):
if self.num_timesteps - self._last >= 10000:
log(f' step {self.num_timesteps:,}/{self._total:,}')
self._last = self.num_timesteps
return True
log('='*60)
log('Exp 5: mountain_track, v5 reward (speed×CTE), throttle_min=0.5')
log('v5 reward gives direct gradient signal for hill: slow=low reward')
log('='*60)
# Switch sim to mountain_track
log('Switching to mountain_track...')
tmp = gym.make('donkey-mountain-track-v0'); time.sleep(2)
_send_exit_scene(tmp, verbose=False); tmp.close(); time.sleep(5)
env = VecTransposeImage(DummyVecEnv([lambda: make_env('donkey-mountain-track-v0')]))
model = PPO('CnnPolicy', env, learning_rate=LR, verbose=1, device='cpu')
model.learn(total_timesteps=TOTAL_STEPS, callback=ProgressCB(TOTAL_STEPS),
reset_num_timesteps=True)
model.save(SAVE_PATH); log(f'Saved.')
env.close(); time.sleep(3)
def eval_track(current_id, track_id, name, n=3):
log(f'\n--- EVAL: {name} ---')
ev = switch_to(current_id, track_id, name)
m = PPO.load(SAVE_PATH, env=ev, device='cpu')
for ep in range(1, n+1):
obs = ev.reset(); total, steps, done = 0.0, 0, False
while not done and steps < 2000:
action, _ = m.predict(obs, deterministic=True)
result = ev.step(action)
if len(result)==5: obs,r,t,tr,info=result; done=bool(t[0] or tr[0])
else: obs,r,d,info=result; done=bool(d[0])
total+=float(r[0]); steps+=1
status='✅ FULL' if steps>=2000 else f'❌ crash@{steps}'
log(f' ep{ep}: {total:.1f} reward / {steps} steps — {status}')
time.sleep(1)
ev.close(); time.sleep(3)
return track_id
current = 'donkey-mountain-track-v0'
current = eval_track(current, 'donkey-mountain-track-v0', 'mountain_track (training)')
current = eval_track(current, 'donkey-generated-track-v0', 'generated_track (zero-shot)')
current = eval_track(current, 'donkey-minimonaco-track-v0', 'mini_monaco (zero-shot)')
current = eval_track(current, 'donkey-generated-roads-v0', 'generated_road (zero-shot)')
log('\n=== Exp 5 COMPLETE ===')