""" ============================================================= DonkeyCar RL Autoresearch Controller Karpathy-style meta-agent that: 1. Loads base sweep data 2. Builds a surrogate model (Gaussian Process) of reward landscape 3. Uses Upper Confidence Bound (UCB) acquisition to propose next params 4. Launches RL jobs via robust runner 5. Records results and iterates autonomously ============================================================= Usage: python3 autoresearch_controller.py [--trials N] [--explore K] All results are appended to: outerloop-results/autoresearch_results.jsonl outerloop-results/autoresearch_log.txt Stop at any time with Ctrl+C. Restart and it picks up from existing data. ============================================================= """ import os import sys import json import time import subprocess import itertools import re import numpy as np from datetime import datetime # ---- Paths ---- PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) RUNNER_SCRIPT = os.path.join(PROJECT_DIR, 'donkeycar_sb3_runner.py') RESULTS_DIR = os.path.join(PROJECT_DIR, 'outerloop-results') BASE_DATA_FILE = os.path.join(RESULTS_DIR, 'clean_sweep_results.jsonl') AUTORESEARCH_RESULTS = os.path.join(RESULTS_DIR, 'autoresearch_results.jsonl') AUTORESEARCH_LOG = os.path.join(RESULTS_DIR, 'autoresearch_log.txt') os.makedirs(RESULTS_DIR, exist_ok=True) # ---- Parameter Space Definition ---- # These define the bounds for the autoresearch to explore. # Autoresearch can propose any value within these continuous ranges. PARAM_SPACE = { 'n_steer': {'type': 'int', 'min': 3, 'max': 9}, 'n_throttle': {'type': 'int', 'min': 2, 'max': 5}, 'learning_rate': {'type': 'float', 'min': 0.00005,'max': 0.005}, } # Fixed params for all runs FIXED_PARAMS = { 'timesteps': 2000, 'eval_episodes': 3, } # How many candidate proposals to sample when searching for next best N_CANDIDATES = 500 # UCB exploration constant (higher = more exploration) UCB_KAPPA = 2.0 # Job timeout seconds JOB_TIMEOUT = 360 # ---- Logging ---- def log(msg): ts = datetime.now().strftime('%Y-%m-%d %H:%M:%S') line = f'[{ts}] {msg}' print(line, flush=True) with open(AUTORESEARCH_LOG, 'a') as f: f.write(line + '\n') # ---- Parameter Encoding (for surrogate model) ---- PARAM_KEYS = list(PARAM_SPACE.keys()) def encode_params(params): """Encode a params dict into a normalized numpy vector [0,1] for the GP.""" vec = [] for k in PARAM_KEYS: spec = PARAM_SPACE[k] v = params[k] norm = (v - spec['min']) / (spec['max'] - spec['min']) vec.append(norm) return np.array(vec) def decode_params(vec): """Decode a normalized numpy vector back to a params dict.""" params = {} for i, k in enumerate(PARAM_KEYS): spec = PARAM_SPACE[k] v = vec[i] * (spec['max'] - spec['min']) + spec['min'] if spec['type'] == 'int': v = int(round(v)) v = max(spec['min'], min(spec['max'], v)) else: v = float(v) v = max(spec['min'], min(spec['max'], v)) params[k] = v return params def random_candidate(): """Sample a random candidate in the parameter space.""" vec = np.random.uniform(0, 1, len(PARAM_KEYS)) return vec # ---- Gaussian Process Surrogate Model (pure numpy, no sklearn needed) ---- class TinyGP: """ Minimal Gaussian Process regressor (RBF kernel) for surrogate modelling. Predicts mean and std of reward for any parameter vector. """ def __init__(self, length_scale=0.3, noise=1e-3): self.ls = length_scale self.noise = noise self.X = None self.y = None self.K_inv = None def _rbf(self, X1, X2): """RBF kernel matrix between X1 and X2.""" diff = X1[:, np.newaxis, :] - X2[np.newaxis, :, :] sq = np.sum(diff**2, axis=-1) return np.exp(-sq / (2 * self.ls**2)) def fit(self, X, y): self.X = np.array(X) self.y = np.array(y) n = len(y) K = self._rbf(self.X, self.X) + self.noise * np.eye(n) try: self.K_inv = np.linalg.inv(K) except np.linalg.LinAlgError: self.K_inv = np.linalg.pinv(K) self.alpha = self.K_inv @ self.y def predict(self, X_new): """Returns (mean, std) arrays for each row in X_new.""" X_new = np.atleast_2d(X_new) K_s = self._rbf(X_new, self.X) mean = K_s @ self.alpha K_ss = np.ones(len(X_new)) + self.noise var = K_ss - np.sum((K_s @ self.K_inv) * K_s, axis=1) var = np.maximum(var, 1e-9) return mean, np.sqrt(var) # ---- Load All Available Data (base sweep + autoresearch results) ---- def load_all_results(): """Load all param-reward pairs from base sweep and any autoresearch runs.""" results = [] for fpath in [BASE_DATA_FILE, AUTORESEARCH_RESULTS]: if not os.path.exists(fpath): continue with open(fpath) as f: for line in f: line = line.strip() if not line: continue try: rec = json.loads(line) mr = rec.get('mean_reward') if mr is not None: results.append({'params': rec['params'], 'mean_reward': float(mr)}) except Exception: pass return results # ---- UCB Acquisition: Propose Next Best Parameters ---- def propose_next_params(results, n_candidates=N_CANDIDATES, kappa=UCB_KAPPA): """ Fit GP on existing results, then maximize UCB acquisition over random candidate samples to propose the next params to try. Returns: proposed params dict """ if len(results) < 2: log('[AutoResearch] Not enough data for GP yet, using random proposal.') return decode_params(random_candidate()) X = np.array([encode_params(r['params']) for r in results]) y = np.array([r['mean_reward'] for r in results]) # Normalize y for numerical stability y_mean = y.mean() y_std = y.std() if y.std() > 0 else 1.0 y_norm = (y - y_mean) / y_std gp = TinyGP(length_scale=0.3, noise=1e-3) gp.fit(X, y_norm) # Sample candidates candidates = np.random.uniform(0, 1, (n_candidates, len(PARAM_KEYS))) # Compute UCB acquisition mu, sigma = gp.predict(candidates) ucb = mu + kappa * sigma best_idx = np.argmax(ucb) best_vec = candidates[best_idx] proposed = decode_params(best_vec) # Log the GP's top predictions top5_idx = np.argsort(ucb)[-5:][::-1] log(f'[AutoResearch] GP UCB top-5 candidates:') for idx in top5_idx: p = decode_params(candidates[idx]) log(f' UCB={ucb[idx]:.4f} mu={mu[idx]:.4f} sigma={sigma[idx]:.4f} params={p}') return proposed # ---- Kill Stale Jobs ---- def kill_stale(): subprocess.run(['pkill', '-9', '-f', 'donkeycar_sb3_runner.py'], check=False) time.sleep(2) # ---- Launch RL Job with Proposed Params ---- def launch_job(params): """Launch a single RL runner job and return (mean_reward, output, status).""" cmd = [ 'python3', RUNNER_SCRIPT, '--agent', 'dqn', '--env', 'donkey-generated-roads-v0', '--timesteps', str(params.get('timesteps', FIXED_PARAMS['timesteps'])), '--eval-episodes', str(params.get('eval_episodes', FIXED_PARAMS['eval_episodes'])), '--n-steer', str(params['n_steer']), '--n-throttle', str(params['n_throttle']), ] log(f'[AutoResearch] Launching job: n_steer={params["n_steer"]} n_throttle={params["n_throttle"]} lr={params["learning_rate"]:.6f}') start = time.time() try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=JOB_TIMEOUT) elapsed = time.time() - start output = proc.stdout + '\n' + proc.stderr status = 'ok' if proc.returncode == 0 else 'error' log(f'[AutoResearch] Job finished in {elapsed:.1f}s, returncode={proc.returncode}') except subprocess.TimeoutExpired as e: elapsed = time.time() - start output = f'[TIMEOUT after {elapsed:.1f}s]' status = 'timeout' log(f'[AutoResearch] Job TIMED OUT after {elapsed:.1f}s') # Parse mean_reward from output mean_reward = None m = re.search(r'\[SB3 Runner\]\[TEST\] mean_reward=([\d.]+)', output) if m: mean_reward = float(m.group(1)) log(f'[AutoResearch] mean_reward={mean_reward}') # Print full runner output for transparency print('--- Runner Output ---') print(output[-3000:]) # last 3000 chars print('--- End Runner Output ---') return mean_reward, output, status, elapsed # ---- Save Result ---- def save_result(trial, params, mean_reward, status, elapsed): rec = { 'trial': trial, 'timestamp': datetime.now().isoformat(), 'params': params, 'mean_reward': mean_reward, 'run_status': status, 'elapsed_sec': elapsed, } with open(AUTORESEARCH_RESULTS, 'a') as f: f.write(json.dumps(rec) + '\n') # ---- Print Current Best ---- def print_summary(results, trial): if not results: return best = max(results, key=lambda r: r['mean_reward']) log(f'[AutoResearch] === Trial {trial} Summary ===') log(f' Total runs in history: {len(results)}') log(f' Best so far: mean_reward={best["mean_reward"]:.4f} params={best["params"]}') # Top 5 sorted_r = sorted(results, key=lambda r: r['mean_reward'], reverse=True) log(f' Top 5 results:') for r in sorted_r[:5]: log(f' mean_reward={r["mean_reward"]:.4f} params={r["params"]}') # ---- Main Autoresearch Loop ---- def run_autoresearch(max_trials=100): log('=' * 60) log('[AutoResearch] Starting Karpathy-style autoresearch controller') log(f'[AutoResearch] Max trials: {max_trials}') log(f'[AutoResearch] Runner: {RUNNER_SCRIPT}') log(f'[AutoResearch] Results: {AUTORESEARCH_RESULTS}') log('=' * 60) # Load all existing data (base sweep + prior autoresearch runs) results = load_all_results() log(f'[AutoResearch] Loaded {len(results)} existing result(s) from base sweep + history.') print_summary(results, trial=0) for trial in range(1, max_trials + 1): log(f'\n[AutoResearch] ========== Trial {trial}/{max_trials} ==========') # 1. Propose next params using GP+UCB proposed = propose_next_params(results) full_params = {**proposed, **FIXED_PARAMS} log(f'[AutoResearch] Proposed params: {full_params}') # 2. Kill any stale jobs kill_stale() # 3. Launch job mean_reward, output, status, elapsed = launch_job(full_params) # 4. Save result save_result(trial, full_params, mean_reward, status, elapsed) # 5. If we got a valid reward, add to results for next GP fit if mean_reward is not None: results.append({'params': full_params, 'mean_reward': mean_reward}) else: log(f'[AutoResearch] WARNING: No valid mean_reward from this trial.') # 6. Print running summary print_summary(results, trial) # 7. Brief pause between trials time.sleep(2) log('[AutoResearch] All trials complete!') print_summary(results, trial=max_trials) # ---- Entry Point ---- if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Karpathy-style autoresearch controller for DonkeyCar RL.') parser.add_argument('--trials', type=int, default=100, help='Number of autoresearch trials to run (default: 100)') parser.add_argument('--explore', type=float, default=2.0, help='UCB exploration constant kappa (default: 2.0, higher=more explore)') args = parser.parse_args() UCB_KAPPA = args.explore run_autoresearch(max_trials=args.trials)