donkeycar-rl-autoresearch/agent/wave3_controller.py

515 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
=================================================================
Wave 3 Autoresearch Controller — Multi-Track Generalization
=================================================================
GP+UCB Bayesian optimization over multi-track training hyperparameters.
Goal: find hyperparameters that maximize ZERO-SHOT generalization —
the model must drive mini_monaco and warren without ever having trained
on them. Only the test score (combined_test_score) feeds the GP.
Track split:
Training : generated_road, generated_track, mountain_track
Test (ZSL): mini_monaco, warren (never seen during training)
Search space:
learning_rate — PPO learning rate [5e-5, 1e-3]
steps_per_switch — steps per track segment before switching [2000, 25000]
total_timesteps — total training budget [80000, 400000]
Each trial:
1. GP+UCB proposes hyperparameters
2. Launches multitrack_runner.py (real PPO training across 3 tracks)
3. Parses combined_test_score from stdout
4. Updates GP with (hyperparams → test_score) mapping
5. Updates champion if test_score improved
Results: outerloop-results/autoresearch_results_phase3.jsonl
Champion: models/wave3-champion/model.zip + manifest.json
Usage:
python3 wave3_controller.py --trials 25 --explore 2.0 --push-every 5
Stop with Ctrl+C at any time — resumes from existing results.
=================================================================
"""
import os
import sys
import json
import time
import subprocess
import re
import shutil
import numpy as np
from datetime import datetime
# ---- Paths ----
PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
REPO_ROOT = os.path.dirname(PROJECT_DIR)
RUNNER = os.path.join(PROJECT_DIR, 'multitrack_runner.py')
RESULTS_DIR = os.path.join(PROJECT_DIR, 'outerloop-results')
MODELS_DIR = os.path.join(PROJECT_DIR, 'models')
CHAMPION_DIR = os.path.join(MODELS_DIR, 'wave3-champion')
RESULTS_FILE = os.path.join(RESULTS_DIR, 'autoresearch_results_phase3.jsonl')
LOG_FILE = os.path.join(RESULTS_DIR, 'autoresearch_phase3_log.txt')
WARM_START = os.path.join(MODELS_DIR, 'champion', 'model.zip') # Phase 2 champion
os.makedirs(RESULTS_DIR, exist_ok=True)
os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(CHAMPION_DIR, exist_ok=True)
# ---- Hyperparameter search space ----
PARAM_SPACE = {
'learning_rate': {'type': 'float', 'min': 5e-5, 'max': 1e-3},
'steps_per_switch': {'type': 'int', 'min': 2000, 'max': 15000},
'total_timesteps': {'type': 'int', 'min': 30000, 'max': 150000},
}
PARAM_KEYS = list(PARAM_SPACE.keys())
FIXED_PARAMS = {
'eval_episodes': 3,
}
N_CANDIDATES = 500
UCB_KAPPA = 2.0
MIN_TRIALS_BEFORE_GP = 3
JOB_TIMEOUT = 7200 # 2h — 400k steps on CPU may need time
# ---- Seed trials near Phase 2 champion ----
# GP warm-up: first 2 trials use known-good parameters so GP has real prior data
SEED_PARAMS = [
# 3 full rotations through all 3 training tracks (~35 min per trial)
{'learning_rate': 0.000225, 'steps_per_switch': 5000, 'total_timesteps': 45000},
# Slower switching, more time per track (~45 min per trial)
{'learning_rate': 0.000225, 'steps_per_switch': 10000, 'total_timesteps': 90000},
]
# ---- Logging ----
def log(msg):
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
line = f'[{ts}] {msg}'
print(line, flush=True)
with open(LOG_FILE, 'a') as f:
f.write(line + '\n')
# ---- Parameter encoding ----
def encode_params(params):
vec = []
for k in PARAM_KEYS:
if k not in params:
vec.append(0.5)
continue
spec = PARAM_SPACE[k]
v = params[k]
norm = (v - spec['min']) / (spec['max'] - spec['min'])
vec.append(float(np.clip(norm, 0.0, 1.0)))
return np.array(vec)
def decode_params(vec):
params = {}
for i, k in enumerate(PARAM_KEYS):
spec = PARAM_SPACE[k]
v = float(vec[i]) * (spec['max'] - spec['min']) + spec['min']
if spec['type'] == 'int':
v = int(round(v))
v = max(spec['min'], min(spec['max'], v))
else:
v = float(np.clip(v, spec['min'], spec['max']))
params[k] = v
return params
def random_candidate():
return np.random.uniform(0, 1, len(PARAM_KEYS))
# ---- Gaussian Process ----
class TinyGP:
"""Minimal RBF-kernel GP for surrogate modelling (pure numpy, no sklearn)."""
def __init__(self, length_scale=0.3, noise=1e-3):
self.ls = length_scale
self.noise = noise
self.X = None
self.alpha = None
self.K_inv = None
def _rbf(self, X1, X2):
diff = X1[:, np.newaxis, :] - X2[np.newaxis, :, :]
sq = np.sum(diff ** 2, axis=-1)
return np.exp(-sq / (2 * self.ls ** 2))
def fit(self, X, y):
self.X = np.array(X)
n = len(y)
K = self._rbf(self.X, self.X) + self.noise * np.eye(n)
try:
self.K_inv = np.linalg.inv(K)
except np.linalg.LinAlgError:
self.K_inv = np.linalg.pinv(K)
self.alpha = self.K_inv @ np.array(y)
def predict(self, X_new):
X_new = np.atleast_2d(X_new)
K_s = self._rbf(X_new, self.X)
mu = K_s @ self.alpha
var = np.maximum(
1.0 + self.noise - np.sum((K_s @ self.K_inv) * K_s, axis=1),
1e-9
)
return mu, np.sqrt(var)
# ---- Champion tracker ----
class Wave3ChampionTracker:
def __init__(self, champion_dir):
self.champion_dir = champion_dir
self.manifest_path = os.path.join(champion_dir, 'manifest.json')
os.makedirs(champion_dir, exist_ok=True)
self._best = self._load()
def _load(self):
if os.path.exists(self.manifest_path):
try:
with open(self.manifest_path) as f:
return json.load(f)
except Exception:
pass
return {'combined_test_score': float('-inf'), 'trial': None}
@property
def best_score(self):
return self._best.get('combined_test_score', float('-inf'))
def update_if_better(self, score, params, model_zip_path, trial,
mini_monaco_reward=None):
if score <= self.best_score:
return False
dest = os.path.join(self.champion_dir, 'model.zip')
if model_zip_path and os.path.exists(model_zip_path):
try:
shutil.copy2(model_zip_path, dest)
except Exception as e:
log(f'[Champion] WARNING: copy failed: {e}')
dest = model_zip_path
manifest = {
'trial': trial,
'timestamp': datetime.now().isoformat(),
'params': params,
'combined_test_score': score,
'mini_monaco_reward': mini_monaco_reward,
'model_path': dest,
}
with open(self.manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
self._best = manifest
log(f'[Champion] 🏆 NEW BEST! Trial {trial}: '
f'score={score:.2f} '
f'(mini_monaco={mini_monaco_reward:.1f}) '
f'params={params}')
return True
def summary(self):
if self._best['trial'] is None:
return 'No Wave 3 champion yet.'
return (f"Wave3 Champion: trial={self._best['trial']} "
f"score={self._best['combined_test_score']:.2f} "
f"params={self._best['params']}")
# ---- Load existing results ----
def load_results():
results = []
if not os.path.exists(RESULTS_FILE):
return results
with open(RESULTS_FILE) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
score = rec.get('combined_test_score')
if score is not None:
results.append({
'params': rec['params'],
'combined_test_score': float(score),
})
except Exception:
pass
return results
# ---- GP+UCB proposal ----
def propose_next_params(results, trial_num, kappa=UCB_KAPPA):
"""
For the first SEED_PARAMS trials: use the hardcoded seed list.
Once GP has enough data: use GP+UCB to propose the next parameters.
"""
# Seed phase — use known-good starting points
seed_idx = trial_num - 1 # trial_num is 1-indexed
if seed_idx < len(SEED_PARAMS):
log(f'[Wave3] Seed trial {trial_num}/{len(SEED_PARAMS)}: using hardcoded params.')
return dict(SEED_PARAMS[seed_idx])
# Not enough data for GP yet — random exploration
if len(results) < MIN_TRIALS_BEFORE_GP:
log(f'[Wave3] Only {len(results)} results — using random proposal.')
return decode_params(random_candidate())
# GP+UCB
X = np.array([encode_params(r['params']) for r in results])
y = np.array([r['combined_test_score'] for r in results])
y_mean = y.mean()
y_std = y.std() if y.std() > 0 else 1.0
y_norm = (y - y_mean) / y_std
gp = TinyGP(length_scale=0.3, noise=1e-3)
gp.fit(X, y_norm)
candidates = np.random.uniform(0, 1, (N_CANDIDATES, len(PARAM_KEYS)))
mu, sigma = gp.predict(candidates)
ucb = mu + kappa * sigma
top5_idx = np.argsort(ucb)[-5:][::-1]
log(f'[Wave3] GP UCB top-5 proposals:')
for idx in top5_idx:
p = decode_params(candidates[idx])
log(f' UCB={ucb[idx]:.4f} mu={mu[idx]:.4f} σ={sigma[idx]:.4f} params={p}')
return decode_params(candidates[np.argmax(ucb)])
# ---- Utility: parse multitrack_runner output ----
def parse_runner_output(output):
"""
Extract test metrics from multitrack_runner.py stdout.
Looks for:
[W3 Runner][TEST] combined_test_score=<float>
[W3 Runner][TEST] mini_monaco_reward=<float>
"""
combined = None
mini_monaco = None
m = re.search(r'\[W3 Runner\]\[TEST\]\s+combined_test_score=([+-]?[\d.]+)', output)
if m:
combined = float(m.group(1))
m = re.search(r'\[W3 Runner\]\[TEST\]\s+mini_monaco_reward=([+-]?[\d.]+)', output)
if m:
mini_monaco = float(m.group(1))
return combined, mini_monaco
# ---- Job launcher ----
def kill_stale():
"""Kill any zombie multitrack_runner or donkeycar_sb3_runner processes."""
subprocess.run(['pkill', '-9', '-f', 'multitrack_runner.py'], check=False)
subprocess.run(['pkill', '-9', '-f', 'donkeycar_sb3_runner.py'], check=False)
time.sleep(2)
def launch_trial(params, trial_num):
"""
Launch multitrack_runner.py as a subprocess with the given hyperparameters.
Returns: (combined_test_score, mini_monaco_reward,
model_zip_path, output, status, elapsed_sec, save_dir)
"""
save_dir = os.path.join(MODELS_DIR, f'wave3-trial-{trial_num:04d}')
os.makedirs(save_dir, exist_ok=True)
cmd = [
'python3', RUNNER,
'--total-timesteps', str(int(params['total_timesteps'])),
'--steps-per-switch', str(int(params['steps_per_switch'])),
'--learning-rate', str(float(params['learning_rate'])),
'--eval-episodes', str(FIXED_PARAMS['eval_episodes']),
'--save-dir', save_dir,
]
# Always warm-start from Phase 2 champion if available
if os.path.exists(WARM_START):
cmd += ['--warm-start', WARM_START]
log(f'[Wave3] Launching trial {trial_num}: {params}')
log(f'[Wave3] Command: {" ".join(cmd)}')
start = time.time()
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=JOB_TIMEOUT,
)
elapsed = time.time() - start
output = proc.stdout + '\n' + proc.stderr
status = 'ok' if proc.returncode == 0 else f'error_rc{proc.returncode}'
log(f'[Wave3] Trial {trial_num} finished in {elapsed:.1f}s, rc={proc.returncode}')
except subprocess.TimeoutExpired:
elapsed = time.time() - start
output = f'[TIMEOUT after {elapsed:.1f}s]'
status = 'timeout'
log(f'[Wave3] Trial {trial_num} TIMED OUT after {elapsed:.1f}s')
# Always print tail of output
print('\n--- Multitrack Runner Output (tail) ---', flush=True)
print(output[-3000:], flush=True)
print('--- End Runner Output ---\n', flush=True)
# Parse results
combined, mini_monaco = parse_runner_output(output)
log(f'[Wave3] Parsed: combined={combined} mini_monaco={mini_monaco}')
model_zip = os.path.join(save_dir, 'model.zip')
if not os.path.exists(model_zip):
model_zip = None
return combined, mini_monaco, model_zip, output, status, elapsed, save_dir
# ---- Result saving ----
def save_result(trial, params, combined, mini_monaco,
model_path, is_champion, status, elapsed):
rec = {
'trial': trial,
'timestamp': datetime.now().isoformat(),
'params': params,
'combined_test_score': combined,
'mini_monaco_reward': mini_monaco,
'model_path': model_path,
'champion': is_champion,
'run_status': status,
'elapsed_sec': elapsed,
}
with open(RESULTS_FILE, 'a') as f:
f.write(json.dumps(rec) + '\n')
# ---- Git push ----
def git_push(trial_num):
try:
subprocess.run(['git', '-C', REPO_ROOT, 'add', '-A'],
check=True, capture_output=True)
subprocess.run([
'git', '-C', REPO_ROOT, 'commit', '-m',
f'wave3: autoresearch trial {trial_num} results\n\n'
f'Agent: pi\nTests: N/A\nTests-Added: 0\nTypeScript: N/A'
], check=True, capture_output=True)
subprocess.run(['git', '-C', REPO_ROOT, 'push'],
check=True, capture_output=True)
log(f'[Wave3] ✅ Git push complete after trial {trial_num}')
except subprocess.CalledProcessError as e:
log(f'[Wave3] ⚠️ Git push failed: {e}')
# ---- Summary ----
def print_summary(results, champion, trial):
if not results:
return
log(f'[Wave3] ===== Trial {trial} Summary =====')
log(f' GP data points : {len(results)}')
log(f' {champion.summary()}')
sorted_r = sorted(results, key=lambda r: r['combined_test_score'], reverse=True)
log(f' Top 5:')
for r in sorted_r[:5]:
log(f' score={r["combined_test_score"]:.2f} params={r["params"]}')
# ---- Main loop ----
def run_wave3(max_trials=25, kappa=UCB_KAPPA, push_every=5):
log('=' * 65)
log('[Wave3] Multi-Track Autoresearch — GP+UCB Generalization Search')
log(f'[Wave3] Training tracks : generated_road, generated_track, mountain_track')
log(f'[Wave3] Test tracks : mini_monaco only (zero-shot; warren removed — broken done condition)')
log(f'[Wave3] Max trials : {max_trials} | kappa={kappa} | push every {push_every}')
log(f'[Wave3] Results file : {RESULTS_FILE}')
log(f'[Wave3] Champion dir : {CHAMPION_DIR}')
log(f'[Wave3] Warm start : {WARM_START}')
log('=' * 65)
results = load_results()
champion = Wave3ChampionTracker(CHAMPION_DIR)
log(f'[Wave3] Loaded {len(results)} existing Phase 3 results.')
log(f'[Wave3] {champion.summary()}')
# Determine starting trial number (resume from existing results)
start_trial = len(results) + 1
log(f'[Wave3] Starting from trial {start_trial}.')
for trial in range(start_trial, max_trials + 1):
log(f'\n[Wave3] ========== Trial {trial}/{max_trials} ==========')
# 1. Propose parameters
proposed = propose_next_params(results, trial, kappa=kappa)
full_params = {**proposed, **FIXED_PARAMS}
log(f'[Wave3] Proposed params: {proposed}')
# 2. Kill stale processes
kill_stale()
# 3. Launch training + eval
combined, mini_monaco, model_zip, output, status, elapsed, save_dir = \
launch_trial(proposed, trial)
# 4. Guard against None results (timeout / crash)
if combined is None:
log(f'[Wave3] ⚠️ No test score parsed — defaulting to 0.0')
combined = 0.0
mini_monaco = mini_monaco or 0.0
# 5. Update champion
is_champion = champion.update_if_better(
combined, proposed, model_zip, trial,
mini_monaco_reward=mini_monaco or 0.0,
)
# 6. Save result
save_result(trial, proposed, combined, mini_monaco,
model_zip, is_champion, status, elapsed)
# 7. Update GP data
if combined > 0:
# Only add valid runs to GP (zero means crash/timeout — not useful)
results.append({'params': proposed, 'combined_test_score': combined})
else:
log(f'[Wave3] combined_test_score=0 — excluded from GP (crash/timeout).')
# 8. Summary
print_summary(results, champion, trial)
# 9. Periodic git push
if push_every > 0 and trial % push_every == 0:
git_push(trial)
time.sleep(2)
log(f'\n[Wave3] ===== All {max_trials} trials complete! =====')
print_summary(results, champion, trial=max_trials)
git_push(max_trials)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description='Wave 3: GP+UCB autoresearch for multi-track generalization.')
parser.add_argument('--trials', type=int, default=25,
help='Number of trials (default: 25)')
parser.add_argument('--explore', type=float, default=2.0,
help='UCB kappa — higher = more exploration (default: 2.0)')
parser.add_argument('--push-every', type=int, default=5,
help='Git push every N trials (0=disabled)')
args = parser.parse_args()
run_wave3(max_trials=args.trials, kappa=args.explore, push_every=args.push_every)