donkeycar-rl-autoresearch/agent/autoresearch_controller.py

444 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
=============================================================
DonkeyCar RL Autoresearch Controller — Phase 1 (Real Training)
=============================================================
Uses Gaussian Process + UCB Bayesian optimization to propose
hyperparameters for REAL PPO/DQN training runs (not random policy).
Each trial:
1. GP+UCB proposes next hyperparameters
2. Launches donkeycar_sb3_runner.py with REAL training
3. Runner saves a trained model to disk
4. Controller records result, updates GP, tracks champion
5. Repeat
Results go to: outerloop-results/autoresearch_results_phase1.jsonl
Champion: models/champion/model.zip + manifest.json
Usage:
python3 autoresearch_controller.py --trials 50 --explore 2.0 --push-every 10
Stop at any time with Ctrl+C. Restart and it picks up from existing results.
=============================================================
"""
import os
import sys
import json
import time
import subprocess
import re
import shutil
import numpy as np
from datetime import datetime
# ---- Project Paths ----
PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
RUNNER_SCRIPT = os.path.join(PROJECT_DIR, 'donkeycar_sb3_runner.py')
RESULTS_DIR = os.path.join(PROJECT_DIR, 'outerloop-results')
MODELS_DIR = os.path.join(PROJECT_DIR, 'models')
CHAMPION_DIR = os.path.join(MODELS_DIR, 'champion')
# Phase 2 uses a separate results file — corner learning with longer timesteps
PHASE1_RESULTS = os.path.join(RESULTS_DIR, 'autoresearch_results_phase2.jsonl')
PHASE1_LOG = os.path.join(RESULTS_DIR, 'autoresearch_phase2_log.txt')
# Legacy base data (discretization insights, valid for n_steer/n_throttle)
BASE_DATA_FILE = os.path.join(RESULTS_DIR, 'clean_sweep_results.jsonl')
os.makedirs(RESULTS_DIR, exist_ok=True)
os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(CHAMPION_DIR, exist_ok=True)
# ---- Parameter Space ----
# These are the parameters GP+UCB will optimize
# PHASE 2: Corner Learning
# Phase 1 confirmed genuine driving (599 steps, mean_reward=1022, efficiency ~99%).
# Failure point: S-curve at step ~560 — too fast, doesn't learn left-turn recovery.
# Fix: Much longer training so model experiences the S-curve many times.
# Search space tightened around Phase 1 winning region: lr=0.00005-0.002, n_throttle=2-5
PARAM_SPACE = {
'n_steer': {'type': 'int', 'min': 3, 'max': 9},
'n_throttle': {'type': 'int', 'min': 2, 'max': 5},
'learning_rate': {'type': 'float', 'min': 0.00005, 'max': 0.002},
'timesteps': {'type': 'int', 'min': 10000, 'max': 50000},
}
PARAM_KEYS = list(PARAM_SPACE.keys())
# Fixed params
FIXED_PARAMS = {
'agent': 'ppo',
'eval_episodes': 5, # More eval episodes — corner performance is stochastic
'reward_shaping': True,
}
N_CANDIDATES = 500
UCB_KAPPA = 2.0
MIN_TRIALS_BEFORE_GP = 3
JOB_TIMEOUT = 3600 # 60 min per trial — 50k steps on CPU needs time
# ---- Logging ----
def log(msg):
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
line = f'[{ts}] {msg}'
print(line, flush=True)
with open(PHASE1_LOG, 'a') as f:
f.write(line + '\n')
# ---- Reward Sanity / Hacking Detection ----
# SpeedRewardWrapper v2 theoretical max:
# max_original_reward ≈ 1.0, max_speed ≈ 10.0, speed_scale=0.1
# max_per_step = 1.0 × (1 + 0.1 × 10) = 2.0
# Flag anything above 3.0 reward/step as suspected hacking.
REWARD_PER_STEP_HACK_THRESHOLD = 3.0
def check_for_reward_hacking(mean_reward, params):
"""Detect reward hacking from physically impossible reward-per-step values."""
if mean_reward is None:
return False
timesteps = params.get('timesteps', 3000)
reward_per_step = mean_reward / max(timesteps, 1)
if reward_per_step > REWARD_PER_STEP_HACK_THRESHOLD:
log(f'[AutoResearch] ⚠️ REWARD HACKING SUSPECTED: '
f'mean_reward={mean_reward:.1f} over {timesteps} steps '
f'= {reward_per_step:.3f}/step > threshold {REWARD_PER_STEP_HACK_THRESHOLD}. '
f'Result EXCLUDED from GP fitting. See docs/RESEARCH_LOG.md.')
return True
return False
# ---- Parameter Encoding ----
def encode_params(params):
vec = []
for k in PARAM_KEYS:
if k not in params:
continue
spec = PARAM_SPACE[k]
v = params[k]
norm = (v - spec['min']) / (spec['max'] - spec['min'])
vec.append(np.clip(norm, 0.0, 1.0))
return np.array(vec)
def decode_params(vec):
params = {}
for i, k in enumerate(PARAM_KEYS):
spec = PARAM_SPACE[k]
v = float(vec[i]) * (spec['max'] - spec['min']) + spec['min']
if spec['type'] == 'int':
v = int(round(v))
v = max(spec['min'], min(spec['max'], v))
else:
v = float(v)
v = max(spec['min'], min(spec['max'], v))
params[k] = v
return params
def random_candidate():
return np.random.uniform(0, 1, len(PARAM_KEYS))
# ---- Gaussian Process Surrogate Model ----
class TinyGP:
"""Minimal RBF-kernel Gaussian Process for surrogate modelling."""
def __init__(self, length_scale=0.3, noise=1e-3):
self.ls = length_scale
self.noise = noise
self.X = None
self.alpha = None
self.K_inv = None
def _rbf(self, X1, X2):
diff = X1[:, np.newaxis, :] - X2[np.newaxis, :, :]
sq = np.sum(diff ** 2, axis=-1)
return np.exp(-sq / (2 * self.ls ** 2))
def fit(self, X, y):
self.X = np.array(X)
n = len(y)
K = self._rbf(self.X, self.X) + self.noise * np.eye(n)
try:
self.K_inv = np.linalg.inv(K)
except np.linalg.LinAlgError:
self.K_inv = np.linalg.pinv(K)
self.alpha = self.K_inv @ np.array(y)
def predict(self, X_new):
X_new = np.atleast_2d(X_new)
K_s = self._rbf(X_new, self.X)
mu = K_s @ self.alpha
var = np.maximum(
1.0 + self.noise - np.sum((K_s @ self.K_inv) * K_s, axis=1),
1e-9
)
return mu, np.sqrt(var)
# ---- Champion Tracker ----
class ChampionTracker:
def __init__(self, champion_dir):
self.champion_dir = champion_dir
self.manifest_path = os.path.join(champion_dir, 'manifest.json')
os.makedirs(champion_dir, exist_ok=True)
self._best = self._load()
def _load(self):
if os.path.exists(self.manifest_path):
try:
with open(self.manifest_path) as f:
return json.load(f)
except Exception:
pass
return {'mean_reward': float('-inf'), 'trial': None}
@property
def best_reward(self):
return self._best.get('mean_reward', float('-inf'))
def update_if_better(self, mean_reward, params, model_zip_path, trial):
if mean_reward is None or mean_reward <= self.best_reward:
return False
dest = os.path.join(self.champion_dir, 'model.zip')
if model_zip_path and os.path.exists(model_zip_path):
try:
shutil.copy2(model_zip_path, dest)
except Exception as e:
log(f'[Champion] WARNING: Could not copy model: {e}')
dest = model_zip_path
manifest = {
'trial': trial,
'timestamp': datetime.now().isoformat(),
'params': params,
'mean_reward': mean_reward,
'model_path': dest,
}
with open(self.manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
self._best = manifest
log(f'[Champion] 🏆 NEW BEST! Trial {trial}: mean_reward={mean_reward:.4f} params={params}')
return True
def summary(self):
if self._best['trial'] is None:
return 'No champion yet.'
return f"Champion: trial={self._best['trial']} mean_reward={self._best['mean_reward']:.4f} params={self._best['params']}"
# ---- Load Results ----
def load_phase1_results():
"""Load Phase 2 results for GP fitting (corner learning runs)."""
results = []
if not os.path.exists(PHASE1_RESULTS):
return results
with open(PHASE1_RESULTS) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
mr = rec.get('mean_reward')
if mr is not None:
results.append({'params': rec['params'], 'mean_reward': float(mr)})
except Exception:
pass
return results
# ---- GP+UCB Proposal ----
def propose_next_params(results, trial_num, kappa=UCB_KAPPA):
if len(results) < MIN_TRIALS_BEFORE_GP:
log(f'[AutoResearch] Only {len(results)} results — using random proposal.')
return decode_params(random_candidate())
X = np.array([encode_params(r['params']) for r in results])
y = np.array([r['mean_reward'] for r in results])
y_mean, y_std = y.mean(), y.std() if y.std() > 0 else 1.0
y_norm = (y - y_mean) / y_std
gp = TinyGP(length_scale=0.3, noise=1e-3)
gp.fit(X, y_norm)
candidates = np.random.uniform(0, 1, (N_CANDIDATES, len(PARAM_KEYS)))
mu, sigma = gp.predict(candidates)
ucb = mu + kappa * sigma
top5 = np.argsort(ucb)[-5:][::-1]
log(f'[AutoResearch] GP UCB top-5 candidates:')
for idx in top5:
p = decode_params(candidates[idx])
log(f' UCB={ucb[idx]:.4f} mu={mu[idx]:.4f} sigma={sigma[idx]:.4f} params={p}')
return decode_params(candidates[np.argmax(ucb)])
# ---- Job Launcher ----
def kill_stale():
subprocess.run(['pkill', '-9', '-f', 'donkeycar_sb3_runner.py'], check=False)
time.sleep(2)
def launch_job(params, trial_num):
save_dir = os.path.join(MODELS_DIR, f'trial-{trial_num:04d}')
os.makedirs(save_dir, exist_ok=True)
cmd = [
'python3', RUNNER_SCRIPT,
'--agent', params.get('agent', FIXED_PARAMS['agent']),
'--env', 'donkey-generated-roads-v0',
'--timesteps', str(int(params.get('timesteps', 10000))),
'--eval-episodes', str(FIXED_PARAMS['eval_episodes']),
'--learning-rate', str(params.get('learning_rate', 0.0003)),
'--n-steer', str(int(params.get('n_steer', 7))),
'--n-throttle', str(int(params.get('n_throttle', 3))),
'--save-dir', save_dir,
]
if FIXED_PARAMS.get('reward_shaping'):
cmd.append('--reward-shaping')
log(f'[AutoResearch] Launching trial {trial_num}: {params}')
start = time.time()
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=JOB_TIMEOUT)
elapsed = time.time() - start
output = proc.stdout + '\n' + proc.stderr
status = 'ok' if proc.returncode == 0 else 'error'
log(f'[AutoResearch] Trial {trial_num} finished in {elapsed:.1f}s, returncode={proc.returncode}')
except subprocess.TimeoutExpired as e:
elapsed = time.time() - start
output = f'[TIMEOUT after {elapsed:.1f}s]'
status = 'timeout'
log(f'[AutoResearch] Trial {trial_num} TIMED OUT after {elapsed:.1f}s')
# Print last 2000 chars of output
print('--- Runner Output (tail) ---', flush=True)
print(output[-2000:], flush=True)
print('--- End Runner Output ---', flush=True)
# Parse results
mean_reward = None
std_reward = None
m = re.search(r'\[SB3 Runner\]\[TEST\] mean_reward=([\d.]+)', output)
if m:
mean_reward = float(m.group(1))
m = re.search(r'\[SB3 Runner\]\[TEST\] std_reward=([\d.]+)', output)
if m:
std_reward = float(m.group(1))
log(f'[AutoResearch] Trial {trial_num}: mean_reward={mean_reward} std_reward={std_reward}')
model_zip = os.path.join(save_dir, 'model.zip')
if not os.path.exists(model_zip):
model_zip = None
return mean_reward, std_reward, model_zip, output, status, elapsed, save_dir
# ---- Result Saving ----
def save_result(trial, params, mean_reward, std_reward, model_path, champion, status, elapsed, hacked=False):
rec = {
'trial': trial,
'timestamp': datetime.now().isoformat(),
'params': params,
'mean_reward': mean_reward,
'std_reward': std_reward,
'model_path': model_path,
'champion': champion,
'run_status': status,
'elapsed_sec': elapsed,
'reward_hacking_suspected': hacked,
}
with open(PHASE1_RESULTS, 'a') as f:
f.write(json.dumps(rec) + '\n')
# ---- Git Push ----
def git_push(project_root, trial_num):
try:
repo_root = os.path.dirname(PROJECT_DIR)
subprocess.run(['git', '-C', repo_root, 'add', '-A'], check=True, capture_output=True)
subprocess.run([
'git', '-C', repo_root, 'commit', '-m',
f'autoresearch: phase1 trial {trial_num} results\n\nAgent: pi\nTests: N/A\nTests-Added: 0\nTypeScript: N/A'
], check=True, capture_output=True)
subprocess.run(['git', '-C', repo_root, 'push'], check=True, capture_output=True)
log(f'[AutoResearch] Git push complete after trial {trial_num}')
except subprocess.CalledProcessError as e:
log(f'[AutoResearch] Git push failed: {e}')
# ---- Summary ----
def print_summary(results, champion, trial):
if not results:
return
log(f'[AutoResearch] === Trial {trial} Summary ===')
log(f' Total Phase 1 runs: {len(results)}')
log(f' {champion.summary()}')
sorted_r = sorted(results, key=lambda r: r['mean_reward'], reverse=True)
log(f' Top 5:')
for r in sorted_r[:5]:
log(f' mean_reward={r["mean_reward"]:.4f} params={r["params"]}')
# ---- Main Loop ----
def run_autoresearch(max_trials=50, kappa=UCB_KAPPA, push_every=10):
log('=' * 60)
log('[AutoResearch] Phase 1 — Real PPO Training + GP+UCB Optimization')
log(f'[AutoResearch] Max trials: {max_trials} | kappa: {kappa} | push every: {push_every}')
log(f'[AutoResearch] Results: {PHASE1_RESULTS}')
log(f'[AutoResearch] Champion: {CHAMPION_DIR}')
log('=' * 60)
results = load_phase1_results()
champion = ChampionTracker(CHAMPION_DIR)
log(f'[AutoResearch] Loaded {len(results)} existing Phase 1 results.')
log(f'[AutoResearch] {champion.summary()}')
for trial in range(1, max_trials + 1):
log(f'\n[AutoResearch] ========== Trial {trial}/{max_trials} ==========')
# 1. Propose params
proposed = propose_next_params(results, trial, kappa=kappa)
full_params = {**proposed, **FIXED_PARAMS}
log(f'[AutoResearch] Proposed: {full_params}')
# 2. Kill stale jobs
kill_stale()
# 3. Launch real training job
mean_reward, std_reward, model_zip, output, status, elapsed, save_dir = launch_job(full_params, trial)
# 4. Check for reward hacking before updating champion
hacked = check_for_reward_hacking(mean_reward, full_params)
# 5. Update champion (only if not hacking)
is_champion = False
if not hacked:
is_champion = champion.update_if_better(mean_reward, full_params, model_zip, trial)
# 6. Save result (flag hacked results)
save_result(trial, full_params, mean_reward, std_reward, model_zip, is_champion, status, elapsed, hacked=hacked)
# 7. Add to GP data (ONLY if not hacking and valid reward)
if mean_reward is not None and not hacked:
results.append({'params': full_params, 'mean_reward': mean_reward})
elif hacked:
log(f'[AutoResearch] Hacked result excluded from GP — GP will not optimize toward this region.')
# 7. Print summary
print_summary(results, champion, trial)
# 8. Git push periodically
if push_every > 0 and trial % push_every == 0:
git_push(PROJECT_DIR, trial)
time.sleep(2)
log('[AutoResearch] All trials complete!')
print_summary(results, champion, trial=max_trials)
# Final push
git_push(PROJECT_DIR, max_trials)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Phase 1 Autoresearch: Real PPO training + GP+UCB.')
parser.add_argument('--trials', type=int, default=50, help='Number of trials (default: 50)')
parser.add_argument('--explore', type=float, default=2.0, help='UCB kappa (default: 2.0)')
parser.add_argument('--push-every', type=int, default=10, help='Git push every N trials (0=disabled)')
args = parser.parse_args()
run_autoresearch(max_trials=args.trials, kappa=args.explore, push_every=args.push_every)