""" Tests for multitrack_runner.py and wave3_controller.py — no live simulator required. Uses mocked gym environments and subprocess output. """ import os import sys import json import tempfile import pytest import numpy as np import gymnasium as gym from unittest.mock import patch, MagicMock, call sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent')) # ───────────────────────────────────────────────────────────────────────────── # Shared mock environment # ───────────────────────────────────────────────────────────────────────────── class MockGymEnv(gym.Env): """Minimal mock of a DonkeyCar environment (same as other test files).""" metadata = {'render_modes': []} def __init__(self, max_steps=30): super().__init__() self.observation_space = gym.spaces.Box( low=0, high=255, shape=(80, 160, 3), dtype=np.uint8 ) self.action_space = gym.spaces.Box( low=np.array([-1.0, 0.2]), high=np.array([1.0, 1.0]), dtype=np.float32 ) self._step_count = 0 self._max_steps = max_steps self._closed = False def reset(self, seed=None, **kwargs): self._step_count = 0 return np.zeros((80, 160, 3), dtype=np.uint8), {} def step(self, action): self._step_count += 1 obs = np.random.randint(0, 255, (80, 160, 3), dtype=np.uint8) reward = float(np.random.uniform(0.5, 2.0)) terminated = self._step_count >= self._max_steps info = {'speed': 2.0, 'cte': 0.3, 'pos': [0.0, 0.0, float(self._step_count)]} return obs, reward, terminated, False, info def close(self): self._closed = True # ───────────────────────────────────────────────────────────────────────────── # multitrack_runner — module-level tests # ───────────────────────────────────────────────────────────────────────────── def test_multitrack_runner_no_syntax_errors(): """multitrack_runner.py must compile without syntax errors.""" path = os.path.join(os.path.dirname(__file__), '..', 'agent', 'multitrack_runner.py') with open(path) as f: src = f.read() compile(src, path, 'exec') # raises SyntaxError if broken def test_multitrack_runner_training_tracks_defined(): """Wave 4: TRAINING_TRACKS = generated_track + mountain_track. generated_road excluded — visually too similar to generated_track, and the Phase-2 warm-start caused catastrophic forgetting of all other tracks.""" from multitrack_runner import TRAINING_TRACKS assert len(TRAINING_TRACKS) == 2 names = [t[0] for t in TRAINING_TRACKS] envids = [t[1] for t in TRAINING_TRACKS] assert 'generated_track' in names assert 'mountain_track' in names assert 'generated_road' not in names, 'generated_road must be excluded from Wave 4' for eid in envids: assert eid.startswith('donkey-'), f'Unexpected env ID: {eid}' def test_multitrack_runner_test_tracks_defined(): """TEST_TRACKS must contain exactly 1 entry: mini_monaco. Warren was removed: CTE-based episode termination does not fire when the car crosses the inside edge, so scores on Warren are unreliable.""" from multitrack_runner import TEST_TRACKS assert len(TEST_TRACKS) == 1 names = [t[0] for t in TEST_TRACKS] assert 'mini_monaco' in names assert 'warren' not in names, 'Warren removed — broken episode termination on inside edge' def test_multitrack_runner_no_model_save_before_definition(): """ADR-005: model.save() must never appear before model is defined.""" path = os.path.join(os.path.dirname(__file__), '..', 'agent', 'multitrack_runner.py') with open(path) as f: source = f.read() lines = source.split('\n') model_defined_at = None in_docstring = False for i, line in enumerate(lines): stripped = line.strip() # Toggle triple-quote docstring state if stripped.count('"""') % 2 == 1: in_docstring = not in_docstring continue if in_docstring or stripped.startswith('#'): continue if 'model = PPO(' in line or 'model = create_or_load_model' in line: model_defined_at = i if 'model.save(' in line and model_defined_at is None: pytest.fail(f'model.save() before model defined at line {i+1}: {line}') def test_wrap_env_applies_throttle_clamp(): """wrap_env() should apply ThrottleClampWrapper so throttle low bound = 0.2.""" from multitrack_runner import wrap_env raw = MockGymEnv() wrapped = wrap_env(raw) # Action space low[1] should be 0.2 (throttle min) assert float(wrapped.action_space.low[1]) == pytest.approx(0.2) def test_wrap_env_returns_valid_action_space(): """Wrapped env should have 2D continuous action space.""" from multitrack_runner import wrap_env raw = MockGymEnv() wrapped = wrap_env(raw) assert hasattr(wrapped.action_space, 'shape') assert wrapped.action_space.shape == (2,) def test_create_or_load_model_no_warm_start(): """Without warm-start path, create_or_load_model() returns a fresh PPO.""" from multitrack_runner import create_or_load_model mock_env = MockGymEnv() with patch('multitrack_runner.PPO') as MockPPO: mock_model = MagicMock() MockPPO.return_value = mock_model result = create_or_load_model(mock_env, learning_rate=0.0003, warm_start_path=None) MockPPO.assert_called_once() assert result is mock_model def test_create_or_load_model_missing_warm_start_falls_back(): """If warm-start file does not exist, should create fresh model.""" from multitrack_runner import create_or_load_model mock_env = MockGymEnv() with patch('multitrack_runner.PPO') as MockPPO: mock_model = MagicMock() MockPPO.return_value = mock_model result = create_or_load_model( mock_env, learning_rate=0.001, warm_start_path='/nonexistent/path/model.zip' ) MockPPO.assert_called_once() assert result is mock_model def test_create_or_load_model_warm_start_load_failure_falls_back(): """If PPO.load() raises, create_or_load_model() falls back to fresh PPO.""" from multitrack_runner import create_or_load_model mock_env = MockGymEnv() with tempfile.NamedTemporaryFile(suffix='.zip') as f: warm_path = f.name # Create a dummy file so os.path.exists() returns True with open(warm_path, 'w') as f: f.write('not a real model') try: with patch('multitrack_runner.PPO') as MockPPO: MockPPO.load.side_effect = Exception('incompatible obs space') fresh_model = MagicMock() MockPPO.return_value = fresh_model result = create_or_load_model(mock_env, learning_rate=0.001, warm_start_path=warm_path) # Should fall back to fresh PPO MockPPO.assert_called_once() finally: if os.path.exists(warm_path): os.remove(warm_path) def test_close_and_switch_calls_env_close(): """close_and_switch() must call env.close() on the old env (ADR-006).""" from multitrack_runner import close_and_switch old_env = MockGymEnv() # Give it a fake viewer so _send_exit_scene has something to call mock_viewer = MagicMock() old_env.unwrapped.viewer = mock_viewer with patch('multitrack_runner.gym.make', return_value=MockGymEnv()), \ patch('multitrack_runner.wrap_env', side_effect=lambda e: e), \ patch('time.sleep'): close_and_switch(old_env, 'donkey-generated-track-v0', verbose=False) assert old_env._closed, 'env.close() should have been called before track switch' def test_close_and_switch_returns_new_env(): """close_and_switch() should return a new wrapped env.""" from multitrack_runner import close_and_switch new_env = MockGymEnv() with patch('multitrack_runner.gym.make', return_value=new_env), \ patch('multitrack_runner.wrap_env', side_effect=lambda e: e), \ patch('time.sleep'): result = close_and_switch(None, 'donkey-generated-track-v0', verbose=False) assert result is new_env def test_close_and_switch_uses_viewer_not_raw_socket(): """exit_scene must be sent via env.unwrapped.viewer, not a new raw connection.""" from multitrack_runner import close_and_switch old_env = MockGymEnv() mock_viewer = MagicMock() old_env.unwrapped.viewer = mock_viewer # attach mock viewer with patch('multitrack_runner.gym.make', return_value=MockGymEnv()), \ patch('multitrack_runner.wrap_env', side_effect=lambda e: e), \ patch('time.sleep'): close_and_switch(old_env, 'donkey-generated-track-v0', verbose=False) mock_viewer.exit_scene.assert_called_once(), \ 'exit_scene must be called on existing viewer (not a new raw socket)' """evaluate_test_tracks() should return a dict keyed by track names.""" from multitrack_runner import evaluate_test_tracks, TEST_TRACKS mock_model = MagicMock() mock_model.predict.return_value = (np.array([0.0, 0.5]), None) new_env = MockGymEnv(max_steps=10) with patch('multitrack_runner.close_and_switch', return_value=new_env), \ patch('time.sleep'): test_results, combined, _ = evaluate_test_tracks( mock_model, current_env=MockGymEnv(), eval_episodes=1 ) track_names = [t[0] for t in TEST_TRACKS] for name in track_names: assert name in test_results, f'Missing test result for {name}' assert isinstance(combined, float) def test_evaluate_test_tracks_combined_score_is_sum(): """combined_test_score should equal the sum of individual test track rewards.""" from multitrack_runner import evaluate_test_tracks mock_model = MagicMock() mock_model.predict.return_value = (np.array([0.0, 0.5]), None) new_env = MockGymEnv(max_steps=5) with patch('multitrack_runner.close_and_switch', return_value=new_env), \ patch('time.sleep'): test_results, combined, _ = evaluate_test_tracks( mock_model, current_env=MockGymEnv(), eval_episodes=2 ) expected = sum(r['mean_reward'] for r in test_results.values()) assert combined == pytest.approx(expected, rel=1e-5) def test_evaluate_test_tracks_connection_failure_gives_zero(): """If we cannot connect to a test track, its score should be 0.0.""" from multitrack_runner import evaluate_test_tracks mock_model = MagicMock() with patch('multitrack_runner.close_and_switch', side_effect=Exception('sim dead')), \ patch('time.sleep'): test_results, combined, _ = evaluate_test_tracks( mock_model, current_env=MockGymEnv(), eval_episodes=1 ) for metrics in test_results.values(): assert metrics['mean_reward'] == 0.0 assert combined == 0.0 # ───────────────────────────────────────────────────────────────────────────── # wave3_controller — unit tests # ───────────────────────────────────────────────────────────────────────────── def test_wave3_controller_no_syntax_errors(): """wave3_controller.py must compile without syntax errors.""" path = os.path.join(os.path.dirname(__file__), '..', 'agent', 'wave3_controller.py') with open(path) as f: src = f.read() compile(src, path, 'exec') def test_wave3_encode_decode_round_trip(): """encode_params → decode_params should round-trip within ±5% for each param.""" from wave3_controller import encode_params, decode_params, PARAM_SPACE original = { 'learning_rate': 0.000225, 'steps_per_switch': 5000, 'total_timesteps': 90000, } vec = encode_params(original) recovered = decode_params(vec) for k in original: spec = PARAM_SPACE[k] rng = spec['max'] - spec['min'] assert abs(recovered[k] - original[k]) < 0.05 * rng, \ f'Round-trip error for {k}: {original[k]} → {recovered[k]}' def test_wave3_decode_clamps_to_bounds(): """decode_params() should clamp output to [min, max] even if vec is out-of-range.""" from wave3_controller import decode_params, PARAM_SPACE # vec values outside [0,1] vec = np.array([-0.5, 1.5, 2.0]) params = decode_params(vec) for k, v in params.items(): spec = PARAM_SPACE[k] assert spec['min'] <= v <= spec['max'], \ f'{k}={v} outside [{spec["min"]}, {spec["max"]}]' def test_wave3_tinygp_predict_shape(): """TinyGP.predict() should return (mu, sigma) with shape (N,) for N candidates.""" from wave3_controller import TinyGP gp = TinyGP() X_train = np.random.rand(5, 3) y_train = np.random.rand(5) gp.fit(X_train, y_train) X_test = np.random.rand(10, 3) mu, sigma = gp.predict(X_test) assert mu.shape == (10,), f'Expected (10,), got {mu.shape}' assert sigma.shape == (10,), f'Expected (10,), got {sigma.shape}' assert np.all(sigma >= 0), 'Sigma must be non-negative' def test_wave3_tinygp_ucb_selects_high_value(): """GP should assign higher UCB to regions near high-reward training points.""" from wave3_controller import TinyGP gp = TinyGP(length_scale=0.2) # Point at 0.9 has reward 10, point at 0.1 has reward 0 X_train = np.array([[0.1, 0.1, 0.1], [0.9, 0.9, 0.9]]) y_train = np.array([0.0, 10.0]) gp.fit(X_train, y_train) # Predict at two test points X_test = np.array([[0.1, 0.1, 0.1], [0.9, 0.9, 0.9]]) mu, _ = gp.predict(X_test) assert mu[1] > mu[0], 'GP should predict higher value near the high-reward training point' def test_wave3_propose_uses_seed_for_first_trials(): """For trial 1 and 2, propose_next_params() returns the hardcoded seed params.""" from wave3_controller import propose_next_params, SEED_PARAMS results = [] # No prior data for i, seed in enumerate(SEED_PARAMS, start=1): proposed = propose_next_params(results, trial_num=i) for k, v in seed.items(): assert proposed[k] == v, f'Trial {i}: {k} should be {v}, got {proposed[k]}' def test_wave3_propose_random_when_few_results(): """With fewer than MIN_TRIALS_BEFORE_GP results, should use random proposal.""" from wave3_controller import propose_next_params, PARAM_SPACE, MIN_TRIALS_BEFORE_GP, SEED_PARAMS # Put trial_num beyond seed phase but with too few results for GP trial_num = len(SEED_PARAMS) + 1 results = [] # Empty — below threshold proposed = propose_next_params(results, trial_num=trial_num) for k, spec in PARAM_SPACE.items(): assert spec['min'] <= proposed[k] <= spec['max'], \ f'{k}={proposed[k]} out of bounds [{spec["min"]}, {spec["max"]}]' def test_wave3_parse_runner_output_combined_score(): """parse_runner_output() should extract combined_test_score and mini_monaco correctly.""" from wave3_controller import parse_runner_output output = """ [12:34:56] [W3 Runner][TEST] track=mini_monaco mean_reward=1234.56 mean_steps=450.0 ✅ DRIVES [12:34:57] [W3 Runner][TEST] mini_monaco_reward=1234.5600 [12:34:57] [W3 Runner][TEST] combined_test_score=1234.5600 """ combined, mini_monaco = parse_runner_output(output) assert combined == pytest.approx(1234.56, rel=1e-4) assert mini_monaco == pytest.approx(1234.56, rel=1e-4) def test_wave3_parse_runner_output_missing_returns_none(): """parse_runner_output() returns None for each metric if not found.""" from wave3_controller import parse_runner_output output = 'Training started... timeout' combined, mini_monaco = parse_runner_output(output) assert combined is None assert mini_monaco is None def test_wave3_champion_tracker_update_and_load(): """Wave3ChampionTracker should update champion and persist to disk.""" from wave3_controller import Wave3ChampionTracker with tempfile.TemporaryDirectory() as tmpdir: tracker = Wave3ChampionTracker(tmpdir) assert tracker.best_score == float('-inf') updated = tracker.update_if_better( score=1500.0, params={'learning_rate': 0.0002, 'steps_per_switch': 8000, 'total_timesteps': 150000}, model_zip_path=None, trial=3, mini_monaco_reward=1500.0, ) assert updated is True assert tracker.best_score == pytest.approx(1500.0) # Reload from disk tracker2 = Wave3ChampionTracker(tmpdir) assert tracker2.best_score == pytest.approx(1500.0) assert tracker2._best['trial'] == 3 def test_wave3_champion_tracker_does_not_regress(): """Champion should not be updated if new score is lower.""" from wave3_controller import Wave3ChampionTracker with tempfile.TemporaryDirectory() as tmpdir: tracker = Wave3ChampionTracker(tmpdir) tracker.update_if_better(2000.0, {}, None, 1, mini_monaco_reward=2000.0) updated = tracker.update_if_better(1500.0, {}, None, 2, mini_monaco_reward=1500.0) assert updated is False assert tracker.best_score == pytest.approx(2000.0) def test_wave3_results_appended_not_overwritten(): """Saving results should append to JSONL file, never overwrite.""" from wave3_controller import save_result with tempfile.TemporaryDirectory() as tmpdir: import wave3_controller original_path = wave3_controller.RESULTS_FILE wave3_controller.RESULTS_FILE = os.path.join(tmpdir, 'phase3_results.jsonl') try: for i in range(3): save_result( trial=i + 1, params={'learning_rate': 0.0002, 'steps_per_switch': 5000, 'total_timesteps': 100000}, combined=float(i * 100), mini_monaco=float(i * 100), model_path=None, is_champion=(i == 2), status='ok', elapsed=120.0 * (i + 1), ) with open(wave3_controller.RESULTS_FILE) as f: lines = [l.strip() for l in f if l.strip()] assert len(lines) == 3 for line in lines: rec = json.loads(line) assert 'combined_test_score' in rec assert 'params' in rec finally: wave3_controller.RESULTS_FILE = original_path def test_wave3_zero_score_excluded_from_gp(): """ Trials with combined_test_score=0 should not be added to the GP data list (they indicate crashes/timeouts, not useful signal). """ # Simulate the logic in run_wave3: only append if combined > 0 results = [] for score in [0.0, 1500.0, 0.0, 800.0]: if score > 0: results.append({'params': {}, 'combined_test_score': score}) assert len(results) == 2, 'Only non-zero scores should feed the GP' assert all(r['combined_test_score'] > 0 for r in results) def test_wave3_param_space_covers_phase2_champion(): """The Phase 3 search space must contain the Phase 2 champion's parameters.""" from wave3_controller import PARAM_SPACE # Phase 2 champion: lr=0.000225, which falls in [5e-5, 1e-3] assert PARAM_SPACE['learning_rate']['min'] <= 0.000225 <= PARAM_SPACE['learning_rate']['max'] # Moderate switching schedule assert PARAM_SPACE['steps_per_switch']['min'] <= 5000 <= PARAM_SPACE['steps_per_switch']['max'] # Phase 3 needs more than Phase 2's 13k; check min >= 20k assert PARAM_SPACE['total_timesteps']['min'] >= 20000 def test_wave3_seed_params_within_space(): """All seed parameter sets must be within the defined search space.""" from wave3_controller import SEED_PARAMS, PARAM_SPACE for i, seed in enumerate(SEED_PARAMS): for k, v in seed.items(): spec = PARAM_SPACE[k] assert spec['min'] <= v <= spec['max'], \ f'Seed {i}: {k}={v} outside [{spec["min"]}, {spec["max"]}]' def test_health_check_callback_stops_on_stuck(): """HealthCheckCallback should return False when speed stays near zero.""" from multitrack_runner import HealthCheckCallback cb = HealthCheckCallback(max_stuck_steps=5, min_speed=0.1) cb.health._stuck_count = 4 # One more step will trigger # Simulate a callback step with very low speed cb.locals = { 'infos': [{'speed': 0.0}], 'new_obs': None, } result = cb._on_step() assert result is False, 'Callback should stop training when sim is stuck'