""" ================================================================= Wave 3 Autoresearch Controller — Multi-Track Generalization ================================================================= GP+UCB Bayesian optimization over multi-track training hyperparameters. Goal: find hyperparameters that maximize ZERO-SHOT generalization — the model must drive mini_monaco and warren without ever having trained on them. Only the test score (combined_test_score) feeds the GP. Track split: Training : generated_road, generated_track, mountain_track Test (ZSL): mini_monaco, warren (never seen during training) Search space: learning_rate — PPO learning rate [5e-5, 1e-3] steps_per_switch — steps per track segment before switching [2000, 25000] total_timesteps — total training budget [80000, 400000] Each trial: 1. GP+UCB proposes hyperparameters 2. Launches multitrack_runner.py (real PPO training across 3 tracks) 3. Parses combined_test_score from stdout 4. Updates GP with (hyperparams → test_score) mapping 5. Updates champion if test_score improved Results: outerloop-results/autoresearch_results_phase3.jsonl Champion: models/wave3-champion/model.zip + manifest.json Usage: python3 wave3_controller.py --trials 25 --explore 2.0 --push-every 5 Stop with Ctrl+C at any time — resumes from existing results. ================================================================= """ import os import sys import json import time import subprocess import re import shutil import numpy as np from datetime import datetime # ---- Paths ---- PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) REPO_ROOT = os.path.dirname(PROJECT_DIR) RUNNER = os.path.join(PROJECT_DIR, 'multitrack_runner.py') RESULTS_DIR = os.path.join(PROJECT_DIR, 'outerloop-results') MODELS_DIR = os.path.join(PROJECT_DIR, 'models') CHAMPION_DIR = os.path.join(MODELS_DIR, 'wave3-champion') RESULTS_FILE = os.path.join(RESULTS_DIR, 'autoresearch_results_phase3.jsonl') LOG_FILE = os.path.join(RESULTS_DIR, 'autoresearch_phase3_log.txt') WARM_START = os.path.join(MODELS_DIR, 'champion', 'model.zip') # Phase 2 champion os.makedirs(RESULTS_DIR, exist_ok=True) os.makedirs(MODELS_DIR, exist_ok=True) os.makedirs(CHAMPION_DIR, exist_ok=True) # ---- Hyperparameter search space ---- PARAM_SPACE = { 'learning_rate': {'type': 'float', 'min': 5e-5, 'max': 1e-3}, 'steps_per_switch': {'type': 'int', 'min': 2000, 'max': 15000}, 'total_timesteps': {'type': 'int', 'min': 30000, 'max': 150000}, } PARAM_KEYS = list(PARAM_SPACE.keys()) FIXED_PARAMS = { 'eval_episodes': 3, } N_CANDIDATES = 500 UCB_KAPPA = 2.0 MIN_TRIALS_BEFORE_GP = 3 JOB_TIMEOUT = 7200 # 2h — 400k steps on CPU may need time # ---- Seed trials near Phase 2 champion ---- # GP warm-up: first 2 trials use known-good parameters so GP has real prior data SEED_PARAMS = [ # Low LR (same as Phase 2 champion) — baseline, ~35 min per trial {'learning_rate': 0.000225, 'steps_per_switch': 5000, 'total_timesteps': 45000}, # High LR — tests whether faster adaptation generalises better, ~35 min {'learning_rate': 0.001000, 'steps_per_switch': 5000, 'total_timesteps': 45000}, ] # ---- Logging ---- def log(msg): ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') line = f'[{ts}] {msg}' print(line, flush=True) with open(LOG_FILE, 'a') as f: f.write(line + '\n') # ---- Parameter encoding ---- def encode_params(params): vec = [] for k in PARAM_KEYS: if k not in params: vec.append(0.5) continue spec = PARAM_SPACE[k] v = params[k] norm = (v - spec['min']) / (spec['max'] - spec['min']) vec.append(float(np.clip(norm, 0.0, 1.0))) return np.array(vec) def decode_params(vec): params = {} for i, k in enumerate(PARAM_KEYS): spec = PARAM_SPACE[k] v = float(vec[i]) * (spec['max'] - spec['min']) + spec['min'] if spec['type'] == 'int': v = int(round(v)) v = max(spec['min'], min(spec['max'], v)) else: v = float(np.clip(v, spec['min'], spec['max'])) params[k] = v return params def random_candidate(): return np.random.uniform(0, 1, len(PARAM_KEYS)) # ---- Gaussian Process ---- class TinyGP: """Minimal RBF-kernel GP for surrogate modelling (pure numpy, no sklearn).""" def __init__(self, length_scale=0.3, noise=1e-3): self.ls = length_scale self.noise = noise self.X = None self.alpha = None self.K_inv = None def _rbf(self, X1, X2): diff = X1[:, np.newaxis, :] - X2[np.newaxis, :, :] sq = np.sum(diff ** 2, axis=-1) return np.exp(-sq / (2 * self.ls ** 2)) def fit(self, X, y): self.X = np.array(X) n = len(y) K = self._rbf(self.X, self.X) + self.noise * np.eye(n) try: self.K_inv = np.linalg.inv(K) except np.linalg.LinAlgError: self.K_inv = np.linalg.pinv(K) self.alpha = self.K_inv @ np.array(y) def predict(self, X_new): X_new = np.atleast_2d(X_new) K_s = self._rbf(X_new, self.X) mu = K_s @ self.alpha var = np.maximum( 1.0 + self.noise - np.sum((K_s @ self.K_inv) * K_s, axis=1), 1e-9 ) return mu, np.sqrt(var) # ---- Champion tracker ---- class Wave3ChampionTracker: def __init__(self, champion_dir): self.champion_dir = champion_dir self.manifest_path = os.path.join(champion_dir, 'manifest.json') os.makedirs(champion_dir, exist_ok=True) self._best = self._load() def _load(self): if os.path.exists(self.manifest_path): try: with open(self.manifest_path) as f: return json.load(f) except Exception: pass return {'combined_test_score': float('-inf'), 'trial': None} @property def best_score(self): return self._best.get('combined_test_score', float('-inf')) def update_if_better(self, score, params, model_zip_path, trial, mini_monaco_reward=None): if score <= self.best_score: return False dest = os.path.join(self.champion_dir, 'model.zip') if model_zip_path and os.path.exists(model_zip_path): try: shutil.copy2(model_zip_path, dest) except Exception as e: log(f'[Champion] WARNING: copy failed: {e}') dest = model_zip_path manifest = { 'trial': trial, 'timestamp': datetime.now().isoformat(), 'params': params, 'combined_test_score': score, 'mini_monaco_reward': mini_monaco_reward, 'model_path': dest, } with open(self.manifest_path, 'w') as f: json.dump(manifest, f, indent=2) self._best = manifest log(f'[Champion] 🏆 NEW BEST! Trial {trial}: ' f'score={score:.2f} ' f'(mini_monaco={mini_monaco_reward:.1f}) ' f'params={params}') return True def summary(self): if self._best['trial'] is None: return 'No Wave 3 champion yet.' return (f"Wave3 Champion: trial={self._best['trial']} " f"score={self._best['combined_test_score']:.2f} " f"params={self._best['params']}") # ---- Load existing results ---- def load_results(): results = [] if not os.path.exists(RESULTS_FILE): return results with open(RESULTS_FILE) as f: for line in f: line = line.strip() if not line: continue try: rec = json.loads(line) score = rec.get('combined_test_score') if score is not None: results.append({ 'params': rec['params'], 'combined_test_score': float(score), }) except Exception: pass return results # ---- GP+UCB proposal ---- def propose_next_params(results, trial_num, kappa=UCB_KAPPA): """ For the first SEED_PARAMS trials: use the hardcoded seed list. Once GP has enough data: use GP+UCB to propose the next parameters. """ # Seed phase — use known-good starting points seed_idx = trial_num - 1 # trial_num is 1-indexed if seed_idx < len(SEED_PARAMS): log(f'[Wave3] Seed trial {trial_num}/{len(SEED_PARAMS)}: using hardcoded params.') return dict(SEED_PARAMS[seed_idx]) # Not enough data for GP yet — random exploration if len(results) < MIN_TRIALS_BEFORE_GP: log(f'[Wave3] Only {len(results)} results — using random proposal.') return decode_params(random_candidate()) # GP+UCB X = np.array([encode_params(r['params']) for r in results]) y = np.array([r['combined_test_score'] for r in results]) y_mean = y.mean() y_std = y.std() if y.std() > 0 else 1.0 y_norm = (y - y_mean) / y_std gp = TinyGP(length_scale=0.3, noise=1e-3) gp.fit(X, y_norm) candidates = np.random.uniform(0, 1, (N_CANDIDATES, len(PARAM_KEYS))) mu, sigma = gp.predict(candidates) ucb = mu + kappa * sigma top5_idx = np.argsort(ucb)[-5:][::-1] log(f'[Wave3] GP UCB top-5 proposals:') for idx in top5_idx: p = decode_params(candidates[idx]) log(f' UCB={ucb[idx]:.4f} mu={mu[idx]:.4f} σ={sigma[idx]:.4f} params={p}') return decode_params(candidates[np.argmax(ucb)]) # ---- Utility: parse multitrack_runner output ---- def parse_runner_output(output): """ Extract test metrics from multitrack_runner.py stdout. Looks for: [W3 Runner][TEST] combined_test_score= [W3 Runner][TEST] mini_monaco_reward= """ combined = None mini_monaco = None m = re.search(r'\[W3 Runner\]\[TEST\]\s+combined_test_score=([+-]?[\d.]+)', output) if m: combined = float(m.group(1)) m = re.search(r'\[W3 Runner\]\[TEST\]\s+mini_monaco_reward=([+-]?[\d.]+)', output) if m: mini_monaco = float(m.group(1)) return combined, mini_monaco # ---- Job launcher ---- def kill_stale(): """Kill any zombie multitrack_runner or donkeycar_sb3_runner processes.""" subprocess.run(['pkill', '-9', '-f', 'multitrack_runner.py'], check=False) subprocess.run(['pkill', '-9', '-f', 'donkeycar_sb3_runner.py'], check=False) time.sleep(2) def launch_trial(params, trial_num): """ Launch multitrack_runner.py as a subprocess with the given hyperparameters. Returns: (combined_test_score, mini_monaco_reward, model_zip_path, output, status, elapsed_sec, save_dir) """ save_dir = os.path.join(MODELS_DIR, f'wave3-trial-{trial_num:04d}') os.makedirs(save_dir, exist_ok=True) cmd = [ 'python3', RUNNER, '--total-timesteps', str(int(params['total_timesteps'])), '--steps-per-switch', str(int(params['steps_per_switch'])), '--learning-rate', str(float(params['learning_rate'])), '--eval-episodes', str(FIXED_PARAMS['eval_episodes']), '--save-dir', save_dir, ] # Always warm-start from Phase 2 champion if available if os.path.exists(WARM_START): cmd += ['--warm-start', WARM_START] log(f'[Wave3] Launching trial {trial_num}: {params}') log(f'[Wave3] Command: {" ".join(cmd)}') start = time.time() output_lines = [] try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # merge stderr into stdout text=True, bufsize=1, # line-buffered ) deadline = start + JOB_TIMEOUT for line in proc.stdout: line = line.rstrip('\n') output_lines.append(line) print(line, flush=True) # streams straight to nohup log if time.time() > deadline: proc.kill() log(f'[Wave3] Trial {trial_num} TIMED OUT — killing runner.') output_lines.append(f'[TIMEOUT after {JOB_TIMEOUT}s]') break proc.wait() elapsed = time.time() - start status = 'ok' if proc.returncode == 0 else f'error_rc{proc.returncode}' log(f'[Wave3] Trial {trial_num} finished in {elapsed:.1f}s, rc={proc.returncode}') except Exception as exc: elapsed = time.time() - start output_lines.append(f'[EXCEPTION: {exc}]') status = 'exception' log(f'[Wave3] Trial {trial_num} raised exception: {exc}') output = '\n'.join(output_lines) # Parse results combined, mini_monaco = parse_runner_output(output) log(f'[Wave3] Parsed: combined={combined} mini_monaco={mini_monaco}') model_zip = os.path.join(save_dir, 'model.zip') if not os.path.exists(model_zip): model_zip = None return combined, mini_monaco, model_zip, output, status, elapsed, save_dir # ---- Result saving ---- def save_result(trial, params, combined, mini_monaco, model_path, is_champion, status, elapsed): rec = { 'trial': trial, 'timestamp': datetime.now().isoformat(), 'params': params, 'combined_test_score': combined, 'mini_monaco_reward': mini_monaco, 'model_path': model_path, 'champion': is_champion, 'run_status': status, 'elapsed_sec': elapsed, } with open(RESULTS_FILE, 'a') as f: f.write(json.dumps(rec) + '\n') # ---- Git push ---- def git_push(trial_num): try: subprocess.run(['git', '-C', REPO_ROOT, 'add', '-A'], check=True, capture_output=True) subprocess.run([ 'git', '-C', REPO_ROOT, 'commit', '-m', f'wave3: autoresearch trial {trial_num} results\n\n' f'Agent: pi\nTests: N/A\nTests-Added: 0\nTypeScript: N/A' ], check=True, capture_output=True) subprocess.run(['git', '-C', REPO_ROOT, 'push'], check=True, capture_output=True) log(f'[Wave3] ✅ Git push complete after trial {trial_num}') except subprocess.CalledProcessError as e: log(f'[Wave3] ⚠️ Git push failed: {e}') # ---- Summary ---- def print_summary(results, champion, trial): if not results: return log(f'[Wave3] ===== Trial {trial} Summary =====') log(f' GP data points : {len(results)}') log(f' {champion.summary()}') sorted_r = sorted(results, key=lambda r: r['combined_test_score'], reverse=True) log(f' Top 5:') for r in sorted_r[:5]: log(f' score={r["combined_test_score"]:.2f} params={r["params"]}') # ---- Main loop ---- def run_wave3(max_trials=25, kappa=UCB_KAPPA, push_every=5): log('=' * 65) log('[Wave3] Multi-Track Autoresearch — GP+UCB Generalization Search') log(f'[Wave3] Training tracks : generated_road, generated_track, mountain_track') log(f'[Wave3] Test tracks : mini_monaco only (zero-shot; warren removed — broken done condition)') log(f'[Wave3] Max trials : {max_trials} | kappa={kappa} | push every {push_every}') log(f'[Wave3] Results file : {RESULTS_FILE}') log(f'[Wave3] Champion dir : {CHAMPION_DIR}') log(f'[Wave3] Warm start : {WARM_START}') log('=' * 65) results = load_results() champion = Wave3ChampionTracker(CHAMPION_DIR) log(f'[Wave3] Loaded {len(results)} existing Phase 3 results.') log(f'[Wave3] {champion.summary()}') # Determine starting trial number (resume from existing results) start_trial = len(results) + 1 log(f'[Wave3] Starting from trial {start_trial}.') for trial in range(start_trial, max_trials + 1): log(f'\n[Wave3] ========== Trial {trial}/{max_trials} ==========') # 1. Propose parameters proposed = propose_next_params(results, trial, kappa=kappa) full_params = {**proposed, **FIXED_PARAMS} log(f'[Wave3] Proposed params: {proposed}') # 2. Kill stale processes kill_stale() # 3. Launch training + eval combined, mini_monaco, model_zip, output, status, elapsed, save_dir = \ launch_trial(proposed, trial) # 4. Guard against None results (timeout / crash) if combined is None: log(f'[Wave3] ⚠️ No test score parsed — defaulting to 0.0') combined = 0.0 mini_monaco = mini_monaco or 0.0 # 5. Update champion is_champion = champion.update_if_better( combined, proposed, model_zip, trial, mini_monaco_reward=mini_monaco or 0.0, ) # 6. Save result save_result(trial, proposed, combined, mini_monaco, model_zip, is_champion, status, elapsed) # 7. Update GP data if combined > 0: # Only add valid runs to GP (zero means crash/timeout — not useful) results.append({'params': proposed, 'combined_test_score': combined}) else: log(f'[Wave3] combined_test_score=0 — excluded from GP (crash/timeout).') # 8. Summary print_summary(results, champion, trial) # 9. Periodic git push if push_every > 0 and trial % push_every == 0: git_push(trial) time.sleep(2) log(f'\n[Wave3] ===== All {max_trials} trials complete! =====') print_summary(results, champion, trial=max_trials) git_push(max_trials) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser( description='Wave 3: GP+UCB autoresearch for multi-track generalization.') parser.add_argument('--trials', type=int, default=25, help='Number of trials (default: 25)') parser.add_argument('--explore', type=float, default=2.0, help='UCB kappa — higher = more exploration (default: 2.0)') parser.add_argument('--push-every', type=int, default=5, help='Git push every N trials (0=disabled)') args = parser.parse_args() run_wave3(max_trials=args.trials, kappa=args.explore, push_every=args.push_every)