import itertools import subprocess import json import time import os import signal def kill_old_rl(): print('[Outer Loop] Killing any stalled RL jobs…', flush=True) try: subprocess.run(['pkill', '-9', '-f', 'donkeycar_sb3_runner.py'], check=False) except Exception as e: print(f'[Outer Loop] pkill failed: {e}', flush=True) # Parameter grid for the sweep grid = { 'n_steer': [3, 5, 7], 'n_throttle': [2, 3], 'learning_rate': [0.001, 0.0005, 0.0001], 'timesteps': [2000], 'eval_episodes': [3], } REPEATS = 3 # robust trials per unique config def build_param_combinations(grid): keys = list(grid.keys()) vals = [grid[k] for k in keys] for v in itertools.product(*vals): yield dict(zip(keys, v)) def run_sweep(): results = [] out_dir = '/home/paulh/.pi/agent/outerloop-results' os.makedirs(out_dir, exist_ok=True) log_file = os.path.join(out_dir, 'sweep_results.jsonl') run_id = 0 for i, params in enumerate(build_param_combinations(grid)): for r in range(REPEATS): run_id += 1 print(f"\n[Outer Loop] Running config {i+1} repeat {r+1}/{REPEATS}: {params}") with open(os.path.join(out_dir, 'outer_monitor.log'), 'a') as mlog: mlog.write(f"\n[MONITOR {time.ctime()}] Starting config {i+1} repeat {r+1}/{REPEATS}: {params}\n") mlog.flush() kill_old_rl() print(f'[Outer Loop MONITOR] Sleeping 2s after RL job kill to ensure teardown, {time.ctime()}', flush=True) time.sleep(2) print('[Outer Loop MONITOR] Launching inner RL job now...', flush=True) with open(os.path.join(out_dir, 'outer_monitor.log'), 'a') as mlog: mlog.write(f"[MONITOR {time.ctime()}] Launching inner RL job for config {i+1} repeat {r+1}\n") mlog.flush() cmd = [ 'python3', '/home/paulh/.pi/agent/donkeycar_sb3_runner.py', '--agent', 'dqn', '--env', 'donkey-generated-roads-v0', '--timesteps', str(params['timesteps']), '--eval-episodes', str(params['eval_episodes']), '--n-steer', str(params['n_steer']), '--n-throttle', str(params['n_throttle']), '--log-dir', os.path.join(out_dir, f'model-{i:03d}') ] # Set learning rate (by env variable, as SB3 DQN uses fixed default in script now) os.environ['SB3_DQN_LR'] = str(params['learning_rate']) start = time.time() try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=360) # 6 min timeout per run elapsed = time.time() - start output = proc.stdout + '\n' + proc.stderr print('[Outer Loop MONITOR] RL job ran, returncode:', proc.returncode, flush=True) # -- Show FULL output to terminal for each run -- print('--------- RL Runner Output (begin) ---------') print(output) print('--------- RL Runner Output (end) ---------') with open(os.path.join(out_dir, 'outer_monitor.log'), 'a') as mlog: mlog.write(f"[MONITOR {time.ctime()}] RL job returncode={proc.returncode} after {elapsed:.1f}s\n") mlog.flush() if proc.returncode != 0: print(f'[OUTER MONITOR ALERT] RL runner exited with error, see results/log file.', flush=True) except subprocess.TimeoutExpired as e: elapsed = time.time() - start # Decode output if available and type bytes def decode(val): if val is None: return '' if isinstance(val, bytes): return val.decode('utf-8', errors='replace') return val output = f"[TIMEOUT] Experiment timed out after {elapsed:.1f}s. Partial output below:\n" + decode(e.stdout) + '\n' + decode(e.stderr) print('[OUTER MONITOR ALERT] RL runner timed out and was killed.', flush=True) with open(os.path.join(out_dir, 'outer_monitor.log'), 'a') as mlog: mlog.write(f"[MONITOR {time.ctime()}] RL runner timed out after {elapsed:.1f}s\n") mlog.flush() except Exception as e: elapsed = time.time() - start output = f"[ERROR] Experiment errored: {str(e)}" print('[OUTER MONITOR ALERT] EXCEPTION LAUNCHING RL RUNNER:', str(e), flush=True) with open(os.path.join(out_dir, 'outer_monitor.log'), 'a') as mlog: mlog.write(f"[MONITOR {time.ctime()}] Exception launching RL runner: {str(e)}\n") mlog.flush() # Try to extract mean_reward from output (parse from '[SB3 Runner] Eval episodes=...') mean_reward = None for line in output.split('\n'): if '[SB3 Runner] Eval episodes=' in line: try: mean_reward = float(line.split('mean_reward=')[1].split()[0]) except Exception: mean_reward = None result = { 'run_id': run_id, 'config_id': i, 'repeat': r, 'params': params, 'mean_reward': mean_reward, 'elapsed_sec': elapsed, 'run_status': 'timeout' if '[TIMEOUT]' in output else ('error' if '[ERROR]' in output else 'ok'), 'raw_output': output[:1000] } results.append(result) with open(log_file, 'a') as f: f.write(json.dumps(result) + '\n') print(f"[Outer Loop] Finished {i+1} repeat {r+1}/{REPEATS}: status={result['run_status']} mean_reward={mean_reward} time={elapsed:.1f}s") print(f"\n[Outer Loop] Sweep done. Results saved in: {log_file}") return results if __name__ == "__main__": run_sweep()