529 lines
18 KiB
Python
529 lines
18 KiB
Python
"""
|
||
=================================================================
|
||
Wave 4 Autoresearch Controller — Multi-Track Generalization (Scratch)
|
||
=================================================================
|
||
GP+UCB Bayesian optimization over multi-track training hyperparameters.
|
||
|
||
Key changes from Wave 3:
|
||
- NO warm-start: each trial trains from random weights.
|
||
The Phase-2 champion CNN was biased toward generated_road and caused
|
||
catastrophic forgetting. Starting fresh forces genuine generalisation.
|
||
- Training tracks: generated_track + mountain_track only.
|
||
Visually distinct (trees vs mountain/barriers) — model must learn
|
||
features that ignore background and follow road markings.
|
||
- Test track (zero-shot): mini_monaco (never seen during training).
|
||
|
||
Search space:
|
||
learning_rate — PPO learning rate [1e-4, 2e-3] (wider for scratch)
|
||
steps_per_switch — steps per track segment [3000, 20000]
|
||
total_timesteps — total training budget [60000, 250000]
|
||
|
||
Each trial:
|
||
1. GP+UCB proposes hyperparameters
|
||
2. Launches multitrack_runner.py (fresh PPO, 2 training tracks)
|
||
3. Parses combined_test_score from stdout
|
||
4. Updates GP with (hyperparams → test_score) mapping
|
||
5. Updates Wave 4 champion if test_score improved
|
||
|
||
Results: outerloop-results/autoresearch_results_phase4.jsonl
|
||
Champion: models/wave4-champion/model.zip + manifest.json
|
||
|
||
Usage:
|
||
python3 wave4_controller.py --trials 25 --explore 2.0 --push-every 5
|
||
|
||
Stop with Ctrl+C at any time — resumes from existing results.
|
||
=================================================================
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
import subprocess
|
||
import re
|
||
import shutil
|
||
import numpy as np
|
||
from datetime import datetime
|
||
|
||
# ---- Paths ----
|
||
PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
REPO_ROOT = os.path.dirname(PROJECT_DIR)
|
||
RUNNER = os.path.join(PROJECT_DIR, 'multitrack_runner.py')
|
||
RESULTS_DIR = os.path.join(PROJECT_DIR, 'outerloop-results')
|
||
MODELS_DIR = os.path.join(PROJECT_DIR, 'models')
|
||
CHAMPION_DIR = os.path.join(MODELS_DIR, 'wave4-champion')
|
||
|
||
RESULTS_FILE = os.path.join(RESULTS_DIR, 'autoresearch_results_phase4.jsonl')
|
||
LOG_FILE = os.path.join(RESULTS_DIR, 'autoresearch_phase4_log.txt')
|
||
|
||
WARM_START = None # Wave 4: always train from scratch # Phase 2 champion
|
||
|
||
os.makedirs(RESULTS_DIR, exist_ok=True)
|
||
os.makedirs(MODELS_DIR, exist_ok=True)
|
||
os.makedirs(CHAMPION_DIR, exist_ok=True)
|
||
|
||
# ---- Hyperparameter search space ----
|
||
# Wider LR range for scratch training (no warm-start prior to anchor it).
|
||
# More total_timesteps needed — fresh model requires more steps to converge.
|
||
PARAM_SPACE = {
|
||
'learning_rate': {'type': 'float', 'min': 1e-4, 'max': 2e-3},
|
||
'steps_per_switch': {'type': 'int', 'min': 3000, 'max': 20000},
|
||
'total_timesteps': {'type': 'int', 'min': 60000, 'max': 250000},
|
||
}
|
||
PARAM_KEYS = list(PARAM_SPACE.keys())
|
||
|
||
FIXED_PARAMS = {
|
||
'eval_episodes': 3,
|
||
}
|
||
|
||
N_CANDIDATES = 500
|
||
UCB_KAPPA = 2.0
|
||
MIN_TRIALS_BEFORE_GP = 3
|
||
JOB_TIMEOUT = 7200 # 2h — 400k steps on CPU may need time
|
||
|
||
# ---- Seed trials near Phase 2 champion ----
|
||
# GP warm-up: first 2 trials use known-good parameters so GP has real prior data
|
||
SEED_PARAMS = [
|
||
# Low-mid LR — stable convergence from scratch (~67 min)
|
||
{'learning_rate': 0.0003, 'steps_per_switch': 6000, 'total_timesteps': 80000},
|
||
# High LR — faster adaptation, tests if scratch model benefits from aggressive LR (~67 min)
|
||
{'learning_rate': 0.001, 'steps_per_switch': 6000, 'total_timesteps': 80000},
|
||
]
|
||
|
||
|
||
# ---- Logging ----
|
||
def log(msg):
|
||
ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
line = f'[{ts}] {msg}'
|
||
print(line, flush=True)
|
||
with open(LOG_FILE, 'a') as f:
|
||
f.write(line + '\n')
|
||
|
||
|
||
# ---- Parameter encoding ----
|
||
def encode_params(params):
|
||
vec = []
|
||
for k in PARAM_KEYS:
|
||
if k not in params:
|
||
vec.append(0.5)
|
||
continue
|
||
spec = PARAM_SPACE[k]
|
||
v = params[k]
|
||
norm = (v - spec['min']) / (spec['max'] - spec['min'])
|
||
vec.append(float(np.clip(norm, 0.0, 1.0)))
|
||
return np.array(vec)
|
||
|
||
|
||
def decode_params(vec):
|
||
params = {}
|
||
for i, k in enumerate(PARAM_KEYS):
|
||
spec = PARAM_SPACE[k]
|
||
v = float(vec[i]) * (spec['max'] - spec['min']) + spec['min']
|
||
if spec['type'] == 'int':
|
||
v = int(round(v))
|
||
v = max(spec['min'], min(spec['max'], v))
|
||
else:
|
||
v = float(np.clip(v, spec['min'], spec['max']))
|
||
params[k] = v
|
||
return params
|
||
|
||
|
||
def random_candidate():
|
||
return np.random.uniform(0, 1, len(PARAM_KEYS))
|
||
|
||
|
||
# ---- Gaussian Process ----
|
||
class TinyGP:
|
||
"""Minimal RBF-kernel GP for surrogate modelling (pure numpy, no sklearn)."""
|
||
|
||
def __init__(self, length_scale=0.3, noise=1e-3):
|
||
self.ls = length_scale
|
||
self.noise = noise
|
||
self.X = None
|
||
self.alpha = None
|
||
self.K_inv = None
|
||
|
||
def _rbf(self, X1, X2):
|
||
diff = X1[:, np.newaxis, :] - X2[np.newaxis, :, :]
|
||
sq = np.sum(diff ** 2, axis=-1)
|
||
return np.exp(-sq / (2 * self.ls ** 2))
|
||
|
||
def fit(self, X, y):
|
||
self.X = np.array(X)
|
||
n = len(y)
|
||
K = self._rbf(self.X, self.X) + self.noise * np.eye(n)
|
||
try:
|
||
self.K_inv = np.linalg.inv(K)
|
||
except np.linalg.LinAlgError:
|
||
self.K_inv = np.linalg.pinv(K)
|
||
self.alpha = self.K_inv @ np.array(y)
|
||
|
||
def predict(self, X_new):
|
||
X_new = np.atleast_2d(X_new)
|
||
K_s = self._rbf(X_new, self.X)
|
||
mu = K_s @ self.alpha
|
||
var = np.maximum(
|
||
1.0 + self.noise - np.sum((K_s @ self.K_inv) * K_s, axis=1),
|
||
1e-9
|
||
)
|
||
return mu, np.sqrt(var)
|
||
|
||
|
||
# ---- Champion tracker ----
|
||
class Wave3ChampionTracker:
|
||
def __init__(self, champion_dir):
|
||
self.champion_dir = champion_dir
|
||
self.manifest_path = os.path.join(champion_dir, 'manifest.json')
|
||
os.makedirs(champion_dir, exist_ok=True)
|
||
self._best = self._load()
|
||
|
||
def _load(self):
|
||
if os.path.exists(self.manifest_path):
|
||
try:
|
||
with open(self.manifest_path) as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
pass
|
||
return {'combined_test_score': float('-inf'), 'trial': None}
|
||
|
||
@property
|
||
def best_score(self):
|
||
return self._best.get('combined_test_score', float('-inf'))
|
||
|
||
def update_if_better(self, score, params, model_zip_path, trial,
|
||
mini_monaco_reward=None):
|
||
if score <= self.best_score:
|
||
return False
|
||
|
||
dest = os.path.join(self.champion_dir, 'model.zip')
|
||
if model_zip_path and os.path.exists(model_zip_path):
|
||
try:
|
||
shutil.copy2(model_zip_path, dest)
|
||
except Exception as e:
|
||
log(f'[Champion] WARNING: copy failed: {e}')
|
||
dest = model_zip_path
|
||
|
||
manifest = {
|
||
'trial': trial,
|
||
'timestamp': datetime.now().isoformat(),
|
||
'params': params,
|
||
'combined_test_score': score,
|
||
'mini_monaco_reward': mini_monaco_reward,
|
||
'model_path': dest,
|
||
}
|
||
with open(self.manifest_path, 'w') as f:
|
||
json.dump(manifest, f, indent=2)
|
||
self._best = manifest
|
||
log(f'[Champion] 🏆 NEW BEST! Trial {trial}: '
|
||
f'score={score:.2f} '
|
||
f'(mini_monaco={mini_monaco_reward:.1f}) '
|
||
f'params={params}')
|
||
return True
|
||
|
||
def summary(self):
|
||
if self._best['trial'] is None:
|
||
return 'No Wave 3 champion yet.'
|
||
return (f"Wave4 Champion: trial={self._best['trial']} "
|
||
f"score={self._best['combined_test_score']:.2f} "
|
||
f"params={self._best['params']}")
|
||
|
||
|
||
# ---- Load existing results ----
|
||
def load_results():
|
||
results = []
|
||
if not os.path.exists(RESULTS_FILE):
|
||
return results
|
||
with open(RESULTS_FILE) as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
rec = json.loads(line)
|
||
score = rec.get('combined_test_score')
|
||
if score is not None:
|
||
results.append({
|
||
'params': rec['params'],
|
||
'combined_test_score': float(score),
|
||
})
|
||
except Exception:
|
||
pass
|
||
return results
|
||
|
||
|
||
# ---- GP+UCB proposal ----
|
||
def propose_next_params(results, trial_num, kappa=UCB_KAPPA):
|
||
"""
|
||
For the first SEED_PARAMS trials: use the hardcoded seed list.
|
||
Once GP has enough data: use GP+UCB to propose the next parameters.
|
||
"""
|
||
# Seed phase — use known-good starting points
|
||
seed_idx = trial_num - 1 # trial_num is 1-indexed
|
||
if seed_idx < len(SEED_PARAMS):
|
||
log(f'[Wave4] Seed trial {trial_num}/{len(SEED_PARAMS)}: using hardcoded params.')
|
||
return dict(SEED_PARAMS[seed_idx])
|
||
|
||
# Not enough data for GP yet — random exploration
|
||
if len(results) < MIN_TRIALS_BEFORE_GP:
|
||
log(f'[Wave4] Only {len(results)} results — using random proposal.')
|
||
return decode_params(random_candidate())
|
||
|
||
# GP+UCB
|
||
X = np.array([encode_params(r['params']) for r in results])
|
||
y = np.array([r['combined_test_score'] for r in results])
|
||
y_mean = y.mean()
|
||
y_std = y.std() if y.std() > 0 else 1.0
|
||
y_norm = (y - y_mean) / y_std
|
||
|
||
gp = TinyGP(length_scale=0.3, noise=1e-3)
|
||
gp.fit(X, y_norm)
|
||
|
||
candidates = np.random.uniform(0, 1, (N_CANDIDATES, len(PARAM_KEYS)))
|
||
mu, sigma = gp.predict(candidates)
|
||
ucb = mu + kappa * sigma
|
||
|
||
top5_idx = np.argsort(ucb)[-5:][::-1]
|
||
log(f'[Wave4] GP UCB top-5 proposals:')
|
||
for idx in top5_idx:
|
||
p = decode_params(candidates[idx])
|
||
log(f' UCB={ucb[idx]:.4f} mu={mu[idx]:.4f} σ={sigma[idx]:.4f} params={p}')
|
||
|
||
return decode_params(candidates[np.argmax(ucb)])
|
||
|
||
|
||
# ---- Utility: parse multitrack_runner output ----
|
||
def parse_runner_output(output):
|
||
"""
|
||
Extract test metrics from multitrack_runner.py stdout.
|
||
|
||
Looks for:
|
||
[W4 Runner][TEST] combined_test_score=<float>
|
||
[W4 Runner][TEST] mini_monaco_reward=<float>
|
||
"""
|
||
combined = None
|
||
mini_monaco = None
|
||
|
||
m = re.search(r'\[W3 Runner\]\[TEST\]\s+combined_test_score=([+-]?[\d.]+)', output)
|
||
if m:
|
||
combined = float(m.group(1))
|
||
|
||
m = re.search(r'\[W3 Runner\]\[TEST\]\s+mini_monaco_reward=([+-]?[\d.]+)', output)
|
||
if m:
|
||
mini_monaco = float(m.group(1))
|
||
|
||
return combined, mini_monaco
|
||
|
||
|
||
# ---- Job launcher ----
|
||
def kill_stale():
|
||
"""Kill any zombie multitrack_runner or donkeycar_sb3_runner processes."""
|
||
subprocess.run(['pkill', '-9', '-f', 'multitrack_runner.py'], check=False)
|
||
subprocess.run(['pkill', '-9', '-f', 'donkeycar_sb3_runner.py'], check=False)
|
||
time.sleep(2)
|
||
|
||
|
||
def launch_trial(params, trial_num):
|
||
"""
|
||
Launch multitrack_runner.py as a subprocess with the given hyperparameters.
|
||
Returns: (combined_test_score, mini_monaco_reward,
|
||
model_zip_path, output, status, elapsed_sec, save_dir)
|
||
"""
|
||
save_dir = os.path.join(MODELS_DIR, f'wave4-trial-{trial_num:04d}')
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
|
||
cmd = [
|
||
'python3', RUNNER,
|
||
'--total-timesteps', str(int(params['total_timesteps'])),
|
||
'--steps-per-switch', str(int(params['steps_per_switch'])),
|
||
'--learning-rate', str(float(params['learning_rate'])),
|
||
'--eval-episodes', str(FIXED_PARAMS['eval_episodes']),
|
||
'--save-dir', save_dir,
|
||
]
|
||
# Wave 4: NO warm-start — train from random weights every trial.
|
||
# (WARM_START is None; passing --warm-start is intentionally omitted)
|
||
|
||
log(f'[Wave4] Launching trial {trial_num}: {params}')
|
||
log(f'[Wave4] Command: {" ".join(cmd)}')
|
||
start = time.time()
|
||
output_lines = []
|
||
|
||
try:
|
||
proc = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT, # merge stderr into stdout
|
||
text=True,
|
||
bufsize=1, # line-buffered
|
||
)
|
||
|
||
deadline = start + JOB_TIMEOUT
|
||
for line in proc.stdout:
|
||
line = line.rstrip('\n')
|
||
output_lines.append(line)
|
||
print(line, flush=True) # streams straight to nohup log
|
||
if time.time() > deadline:
|
||
proc.kill()
|
||
log(f'[Wave4] Trial {trial_num} TIMED OUT — killing runner.')
|
||
output_lines.append(f'[TIMEOUT after {JOB_TIMEOUT}s]')
|
||
break
|
||
|
||
proc.wait()
|
||
elapsed = time.time() - start
|
||
status = 'ok' if proc.returncode == 0 else f'error_rc{proc.returncode}'
|
||
log(f'[Wave4] Trial {trial_num} finished in {elapsed:.1f}s, rc={proc.returncode}')
|
||
|
||
except Exception as exc:
|
||
elapsed = time.time() - start
|
||
output_lines.append(f'[EXCEPTION: {exc}]')
|
||
status = 'exception'
|
||
log(f'[Wave4] Trial {trial_num} raised exception: {exc}')
|
||
|
||
output = '\n'.join(output_lines)
|
||
|
||
# Parse results
|
||
combined, mini_monaco = parse_runner_output(output)
|
||
log(f'[Wave4] Parsed: combined={combined} mini_monaco={mini_monaco}')
|
||
|
||
model_zip = os.path.join(save_dir, 'model.zip')
|
||
if not os.path.exists(model_zip):
|
||
model_zip = None
|
||
|
||
return combined, mini_monaco, model_zip, output, status, elapsed, save_dir
|
||
|
||
|
||
# ---- Result saving ----
|
||
def save_result(trial, params, combined, mini_monaco,
|
||
model_path, is_champion, status, elapsed):
|
||
rec = {
|
||
'trial': trial,
|
||
'timestamp': datetime.now().isoformat(),
|
||
'params': params,
|
||
'combined_test_score': combined,
|
||
'mini_monaco_reward': mini_monaco,
|
||
'model_path': model_path,
|
||
'champion': is_champion,
|
||
'run_status': status,
|
||
'elapsed_sec': elapsed,
|
||
}
|
||
with open(RESULTS_FILE, 'a') as f:
|
||
f.write(json.dumps(rec) + '\n')
|
||
|
||
|
||
# ---- Git push ----
|
||
def git_push(trial_num):
|
||
try:
|
||
subprocess.run(['git', '-C', REPO_ROOT, 'add', '-A'],
|
||
check=True, capture_output=True)
|
||
subprocess.run([
|
||
'git', '-C', REPO_ROOT, 'commit', '-m',
|
||
f'wave3: autoresearch trial {trial_num} results\n\n'
|
||
f'Agent: pi\nTests: N/A\nTests-Added: 0\nTypeScript: N/A'
|
||
], check=True, capture_output=True)
|
||
subprocess.run(['git', '-C', REPO_ROOT, 'push'],
|
||
check=True, capture_output=True)
|
||
log(f'[Wave4] ✅ Git push complete after trial {trial_num}')
|
||
except subprocess.CalledProcessError as e:
|
||
log(f'[Wave4] ⚠️ Git push failed: {e}')
|
||
|
||
|
||
# ---- Summary ----
|
||
def print_summary(results, champion, trial):
|
||
if not results:
|
||
return
|
||
log(f'[Wave4] ===== Trial {trial} Summary =====')
|
||
log(f' GP data points : {len(results)}')
|
||
log(f' {champion.summary()}')
|
||
sorted_r = sorted(results, key=lambda r: r['combined_test_score'], reverse=True)
|
||
log(f' Top 5:')
|
||
for r in sorted_r[:5]:
|
||
log(f' score={r["combined_test_score"]:.2f} params={r["params"]}')
|
||
|
||
|
||
# ---- Main loop ----
|
||
def run_wave3(max_trials=25, kappa=UCB_KAPPA, push_every=5):
|
||
log('=' * 65)
|
||
log('[Wave4] Multi-Track Autoresearch — GP+UCB Generalization Search')
|
||
log(f'[Wave4] Training tracks : generated_track, mountain_track (no generated_road, no warm-start)')
|
||
log(f'[Wave4] Test tracks : mini_monaco only (zero-shot; warren removed — broken done condition)')
|
||
log(f'[Wave4] Max trials : {max_trials} | kappa={kappa} | push every {push_every}')
|
||
log(f'[Wave4] Results file : {RESULTS_FILE}')
|
||
log(f'[Wave4] Champion dir : {CHAMPION_DIR}')
|
||
log(f'[Wave4] Warm start : NONE (training from scratch each trial)')
|
||
log('=' * 65)
|
||
|
||
results = load_results()
|
||
champion = Wave3ChampionTracker(CHAMPION_DIR)
|
||
|
||
log(f'[Wave4] Loaded {len(results)} existing Phase 3 results.')
|
||
log(f'[Wave4] {champion.summary()}')
|
||
|
||
# Determine starting trial number (resume from existing results)
|
||
start_trial = len(results) + 1
|
||
log(f'[Wave4] Starting from trial {start_trial}.')
|
||
|
||
for trial in range(start_trial, max_trials + 1):
|
||
log(f'\n[Wave4] ========== Trial {trial}/{max_trials} ==========')
|
||
|
||
# 1. Propose parameters
|
||
proposed = propose_next_params(results, trial, kappa=kappa)
|
||
full_params = {**proposed, **FIXED_PARAMS}
|
||
log(f'[Wave4] Proposed params: {proposed}')
|
||
|
||
# 2. Kill stale processes
|
||
kill_stale()
|
||
|
||
# 3. Launch training + eval
|
||
combined, mini_monaco, model_zip, output, status, elapsed, save_dir = \
|
||
launch_trial(proposed, trial)
|
||
|
||
# 4. Guard against None results (timeout / crash)
|
||
if combined is None:
|
||
log(f'[Wave4] ⚠️ No test score parsed — defaulting to 0.0')
|
||
combined = 0.0
|
||
mini_monaco = mini_monaco or 0.0
|
||
|
||
# 5. Update champion
|
||
is_champion = champion.update_if_better(
|
||
combined, proposed, model_zip, trial,
|
||
mini_monaco_reward=mini_monaco or 0.0,
|
||
)
|
||
|
||
# 6. Save result
|
||
save_result(trial, proposed, combined, mini_monaco,
|
||
model_zip, is_champion, status, elapsed)
|
||
|
||
# 7. Update GP data
|
||
if combined > 0:
|
||
# Only add valid runs to GP (zero means crash/timeout — not useful)
|
||
results.append({'params': proposed, 'combined_test_score': combined})
|
||
else:
|
||
log(f'[Wave4] combined_test_score=0 — excluded from GP (crash/timeout).')
|
||
|
||
# 8. Summary
|
||
print_summary(results, champion, trial)
|
||
|
||
# 9. Periodic git push
|
||
if push_every > 0 and trial % push_every == 0:
|
||
git_push(trial)
|
||
|
||
time.sleep(2)
|
||
|
||
log(f'\n[Wave4] ===== All {max_trials} trials complete! =====')
|
||
print_summary(results, champion, trial=max_trials)
|
||
git_push(max_trials)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import argparse
|
||
parser = argparse.ArgumentParser(
|
||
description='Wave 4: GP+UCB autoresearch, scratch training, 2-track generalisation.')
|
||
parser.add_argument('--trials', type=int, default=25,
|
||
help='Number of trials (default: 25)')
|
||
parser.add_argument('--explore', type=float, default=2.0,
|
||
help='UCB kappa — higher = more exploration (default: 2.0)')
|
||
parser.add_argument('--push-every', type=int, default=5,
|
||
help='Git push every N trials (0=disabled)')
|
||
args = parser.parse_args()
|
||
|
||
run_wave3(max_trials=args.trials, kappa=args.explore, push_every=args.push_every)
|