""" Tests for autoresearch_controller.py — no simulator required. """ import json import os import sys import pytest import numpy as np import tempfile sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent')) # Patch paths before import so the controller doesn't try to read/write real files import autoresearch_controller as ctrl # ---- Param Encoding Tests ---- def test_param_encode_decode_roundtrip(): """encode → decode should reproduce original values (within int rounding).""" params = {'n_steer': 7, 'n_throttle': 3, 'learning_rate': 0.002, 'timesteps': 3000} vec = ctrl.encode_params(params) recovered = ctrl.decode_params(vec) assert recovered['n_steer'] == params['n_steer'] assert recovered['n_throttle'] == params['n_throttle'] assert abs(recovered['learning_rate'] - params['learning_rate']) < 1e-6 assert recovered['timesteps'] == params['timesteps'] def test_param_encode_normalizes_to_unit_cube(): """Encoded values should all be in [0, 1].""" params = {'n_steer': 9, 'n_throttle': 5, 'learning_rate': 0.005, 'timesteps': 30000} vec = ctrl.encode_params(params) assert all(0.0 <= v <= 1.0 for v in vec), f"Encoded values out of range: {vec}" def test_param_decode_min_values(): """Zero vector should decode to min values.""" vec = np.zeros(len(ctrl.PARAM_KEYS)) params = ctrl.decode_params(vec) for k in ctrl.PARAM_KEYS: spec = ctrl.PARAM_SPACE[k] assert params[k] == spec['min'] or abs(params[k] - spec['min']) < 1e-6, \ f"{k}: expected {spec['min']}, got {params[k]}" def test_param_decode_max_values(): """Ones vector should decode to max values.""" vec = np.ones(len(ctrl.PARAM_KEYS)) params = ctrl.decode_params(vec) for k in ctrl.PARAM_KEYS: spec = ctrl.PARAM_SPACE[k] assert params[k] == spec['max'] or abs(params[k] - spec['max']) < 1e-6, \ f"{k}: expected {spec['max']}, got {params[k]}" def test_param_decode_clamps_out_of_range(): """Values outside [0,1] should be clamped to valid range.""" vec = np.array([1.5, -0.5, 2.0, 0.5]) params = ctrl.decode_params(vec) for k in ctrl.PARAM_KEYS: spec = ctrl.PARAM_SPACE[k] assert spec['min'] <= params[k] <= spec['max'], \ f"{k}: {params[k]} out of [{spec['min']}, {spec['max']}]" # ---- Gaussian Process Tests ---- def test_gp_fit_predict_shape(): """GP predict should return arrays with correct shape.""" gp = ctrl.TinyGP() X = np.random.uniform(0, 1, (10, 4)) y = np.random.uniform(0, 1, 10) gp.fit(X, y) X_new = np.random.uniform(0, 1, (5, 4)) mu, sigma = gp.predict(X_new) assert mu.shape == (5,) assert sigma.shape == (5,) def test_gp_sigma_positive(): """GP uncertainty (sigma) should be strictly positive.""" gp = ctrl.TinyGP() X = np.random.uniform(0, 1, (10, 4)) y = np.random.uniform(0, 1, 10) gp.fit(X, y) X_new = np.random.uniform(0, 1, (20, 4)) mu, sigma = gp.predict(X_new) assert np.all(sigma > 0), f"Some sigma values non-positive: {sigma.min()}" def test_gp_higher_uncertainty_far_from_data(): """GP should be more uncertain far from training data than near it.""" gp = ctrl.TinyGP(length_scale=0.1) X_train = np.array([[0.1, 0.1, 0.1, 0.1]]) y_train = np.array([1.0]) gp.fit(X_train, y_train) near = np.array([[0.1, 0.1, 0.1, 0.1]]) far = np.array([[0.9, 0.9, 0.9, 0.9]]) _, sigma_near = gp.predict(near) _, sigma_far = gp.predict(far) assert sigma_far[0] > sigma_near[0], \ f"Expected higher uncertainty far from data: near={sigma_near[0]:.4f}, far={sigma_far[0]:.4f}" def test_ucb_proposal_prefers_high_reward_region(): """ GP+UCB should propose params near the high-reward region. Known: n_steer=8, n_throttle=5, lr~0.002 → high reward (from 300 trial history) """ np.random.seed(42) # Synthesize training data: high reward at high n_steer + moderate lr results = [] for n_steer in [3, 5, 7, 8, 9]: for lr in [0.0001, 0.001, 0.002, 0.004]: reward = n_steer * 5.0 + (1.0 - abs(lr - 0.002) / 0.002) * 20.0 results.append({ 'params': {'n_steer': n_steer, 'n_throttle': 3, 'learning_rate': lr, 'timesteps': 10000}, 'mean_reward': reward }) proposed = ctrl.propose_next_params(results, trial_num=20, kappa=2.0) # Best n_steer is 9 (highest in space), best lr is 0.002 assert proposed['n_steer'] >= 7, f"Expected high n_steer proposal, got {proposed['n_steer']}" assert 0.001 <= proposed['learning_rate'] <= 0.004, \ f"Expected moderate lr proposal, got {proposed['learning_rate']}" # ---- Champion Tracker Tests ---- def test_champion_tracker_updates_on_better_reward(): """Champion should update when a better reward is found.""" with tempfile.TemporaryDirectory() as tmpdir: tracker = ctrl.ChampionTracker(tmpdir) assert tracker.best_reward == float('-inf') updated = tracker.update_if_better(50.0, {'n_steer': 5}, None, trial=1) assert updated is True assert tracker.best_reward == 50.0 def test_champion_tracker_no_update_on_worse_reward(): """Champion should NOT update when a worse reward is found.""" with tempfile.TemporaryDirectory() as tmpdir: tracker = ctrl.ChampionTracker(tmpdir) tracker.update_if_better(80.0, {'n_steer': 7}, None, trial=1) updated = tracker.update_if_better(60.0, {'n_steer': 5}, None, trial=2) assert updated is False assert tracker.best_reward == 80.0 def test_champion_tracker_sequence(): """Champion sequence: [50, 80, 60, 90, 70] → updates at indices 0, 1, 3.""" with tempfile.TemporaryDirectory() as tmpdir: tracker = ctrl.ChampionTracker(tmpdir) rewards = [50, 80, 60, 90, 70] champions = [] for i, r in enumerate(rewards): if tracker.update_if_better(float(r), {'r': r}, None, trial=i): champions.append(i) assert champions == [0, 1, 3], f"Expected [0,1,3], got {champions}" assert tracker.best_reward == 90.0 def test_champion_tracker_manifest_persists(): """Champion manifest should persist across tracker instances.""" with tempfile.TemporaryDirectory() as tmpdir: tracker1 = ctrl.ChampionTracker(tmpdir) tracker1.update_if_better(75.0, {'n_steer': 8}, None, trial=5) tracker2 = ctrl.ChampionTracker(tmpdir) assert tracker2.best_reward == 75.0 def test_champion_tracker_handles_none_reward(): """Champion tracker should handle None reward gracefully (failed trial).""" with tempfile.TemporaryDirectory() as tmpdir: tracker = ctrl.ChampionTracker(tmpdir) updated = tracker.update_if_better(None, {}, None, trial=1) assert updated is False assert tracker.best_reward == float('-inf') # ---- Random Proposal Fallback ---- def test_random_proposal_when_insufficient_data(): """With < MIN_TRIALS_BEFORE_GP results, should use random proposal (not crash).""" results = [ {'params': {'n_steer': 5, 'n_throttle': 3, 'learning_rate': 0.001, 'timesteps': 10000}, 'mean_reward': 50.0} ] # Should not raise even with 1 result proposed = ctrl.propose_next_params(results, trial_num=1, kappa=2.0) assert 'n_steer' in proposed assert 'learning_rate' in proposed