donkeycar-rl-autoresearch/tests/test_wave3.py

537 lines
21 KiB
Python

"""
Tests for multitrack_runner.py and wave3_controller.py — no live simulator required.
Uses mocked gym environments and subprocess output.
"""
import os
import sys
import json
import tempfile
import pytest
import numpy as np
import gymnasium as gym
from unittest.mock import patch, MagicMock, call
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
# ─────────────────────────────────────────────────────────────────────────────
# Shared mock environment
# ─────────────────────────────────────────────────────────────────────────────
class MockGymEnv(gym.Env):
"""Minimal mock of a DonkeyCar environment (same as other test files)."""
metadata = {'render_modes': []}
def __init__(self, max_steps=30):
super().__init__()
self.observation_space = gym.spaces.Box(
low=0, high=255, shape=(80, 160, 3), dtype=np.uint8
)
self.action_space = gym.spaces.Box(
low=np.array([-1.0, 0.2]),
high=np.array([1.0, 1.0]),
dtype=np.float32
)
self._step_count = 0
self._max_steps = max_steps
self._closed = False
def reset(self, seed=None, **kwargs):
self._step_count = 0
return np.zeros((80, 160, 3), dtype=np.uint8), {}
def step(self, action):
self._step_count += 1
obs = np.random.randint(0, 255, (80, 160, 3), dtype=np.uint8)
reward = float(np.random.uniform(0.5, 2.0))
terminated = self._step_count >= self._max_steps
info = {'speed': 2.0, 'cte': 0.3, 'pos': [0.0, 0.0, float(self._step_count)]}
return obs, reward, terminated, False, info
def close(self):
self._closed = True
# ─────────────────────────────────────────────────────────────────────────────
# multitrack_runner — module-level tests
# ─────────────────────────────────────────────────────────────────────────────
def test_multitrack_runner_no_syntax_errors():
"""multitrack_runner.py must compile without syntax errors."""
path = os.path.join(os.path.dirname(__file__), '..', 'agent', 'multitrack_runner.py')
with open(path) as f:
src = f.read()
compile(src, path, 'exec') # raises SyntaxError if broken
def test_multitrack_runner_training_tracks_defined():
"""TRAINING_TRACKS must contain exactly 3 entries with valid env IDs."""
from multitrack_runner import TRAINING_TRACKS
assert len(TRAINING_TRACKS) == 3
names = [t[0] for t in TRAINING_TRACKS]
envids = [t[1] for t in TRAINING_TRACKS]
assert 'generated_road' in names
assert 'generated_track' in names
assert 'mountain_track' in names
for eid in envids:
assert eid.startswith('donkey-'), f'Unexpected env ID: {eid}'
def test_multitrack_runner_test_tracks_defined():
"""TEST_TRACKS must contain exactly 2 entries: mini_monaco and warren."""
from multitrack_runner import TEST_TRACKS
assert len(TEST_TRACKS) == 2
names = [t[0] for t in TEST_TRACKS]
assert 'mini_monaco' in names
assert 'warren' in names
def test_multitrack_runner_no_model_save_before_definition():
"""ADR-005: model.save() must never appear before model is defined."""
path = os.path.join(os.path.dirname(__file__), '..', 'agent', 'multitrack_runner.py')
with open(path) as f:
source = f.read()
lines = source.split('\n')
model_defined_at = None
in_docstring = False
for i, line in enumerate(lines):
stripped = line.strip()
# Toggle triple-quote docstring state
if stripped.count('"""') % 2 == 1:
in_docstring = not in_docstring
continue
if in_docstring or stripped.startswith('#'):
continue
if 'model = PPO(' in line or 'model = create_or_load_model' in line:
model_defined_at = i
if 'model.save(' in line and model_defined_at is None:
pytest.fail(f'model.save() before model defined at line {i+1}: {line}')
def test_wrap_env_applies_throttle_clamp():
"""wrap_env() should apply ThrottleClampWrapper so throttle low bound = 0.2."""
from multitrack_runner import wrap_env
raw = MockGymEnv()
wrapped = wrap_env(raw)
# Action space low[1] should be 0.2 (throttle min)
assert float(wrapped.action_space.low[1]) == pytest.approx(0.2)
def test_wrap_env_returns_valid_action_space():
"""Wrapped env should have 2D continuous action space."""
from multitrack_runner import wrap_env
raw = MockGymEnv()
wrapped = wrap_env(raw)
assert hasattr(wrapped.action_space, 'shape')
assert wrapped.action_space.shape == (2,)
def test_create_or_load_model_no_warm_start():
"""Without warm-start path, create_or_load_model() returns a fresh PPO."""
from multitrack_runner import create_or_load_model
mock_env = MockGymEnv()
with patch('multitrack_runner.PPO') as MockPPO:
mock_model = MagicMock()
MockPPO.return_value = mock_model
result = create_or_load_model(mock_env, learning_rate=0.0003,
warm_start_path=None)
MockPPO.assert_called_once()
assert result is mock_model
def test_create_or_load_model_missing_warm_start_falls_back():
"""If warm-start file does not exist, should create fresh model."""
from multitrack_runner import create_or_load_model
mock_env = MockGymEnv()
with patch('multitrack_runner.PPO') as MockPPO:
mock_model = MagicMock()
MockPPO.return_value = mock_model
result = create_or_load_model(
mock_env, learning_rate=0.001,
warm_start_path='/nonexistent/path/model.zip'
)
MockPPO.assert_called_once()
assert result is mock_model
def test_create_or_load_model_warm_start_load_failure_falls_back():
"""If PPO.load() raises, create_or_load_model() falls back to fresh PPO."""
from multitrack_runner import create_or_load_model
mock_env = MockGymEnv()
with tempfile.NamedTemporaryFile(suffix='.zip') as f:
warm_path = f.name
# Create a dummy file so os.path.exists() returns True
with open(warm_path, 'w') as f:
f.write('not a real model')
try:
with patch('multitrack_runner.PPO') as MockPPO:
MockPPO.load.side_effect = Exception('incompatible obs space')
fresh_model = MagicMock()
MockPPO.return_value = fresh_model
result = create_or_load_model(mock_env, learning_rate=0.001,
warm_start_path=warm_path)
# Should fall back to fresh PPO
MockPPO.assert_called_once()
finally:
if os.path.exists(warm_path):
os.remove(warm_path)
def test_close_and_switch_calls_env_close():
"""close_and_switch() must call env.close() on the old env (ADR-006)."""
from multitrack_runner import close_and_switch
old_env = MockGymEnv()
with patch('multitrack_runner.send_exit_scene_raw', return_value=True), \
patch('multitrack_runner.gym.make', return_value=MockGymEnv()), \
patch('multitrack_runner.wrap_env', side_effect=lambda e: e), \
patch('time.sleep'):
close_and_switch(old_env, 'donkey-generated-track-v0', verbose=False)
assert old_env._closed, 'env.close() should have been called before track switch'
def test_close_and_switch_returns_new_env():
"""close_and_switch() should return a new wrapped env."""
from multitrack_runner import close_and_switch
new_env = MockGymEnv()
with patch('multitrack_runner.send_exit_scene_raw', return_value=True), \
patch('multitrack_runner.gym.make', return_value=new_env), \
patch('multitrack_runner.wrap_env', side_effect=lambda e: e), \
patch('time.sleep'):
result = close_and_switch(None, 'donkey-generated-track-v0', verbose=False)
assert result is new_env
def test_evaluate_test_tracks_returns_dict_with_track_keys():
"""evaluate_test_tracks() should return a dict keyed by track names."""
from multitrack_runner import evaluate_test_tracks, TEST_TRACKS
mock_model = MagicMock()
mock_model.predict.return_value = (np.array([0.0, 0.5]), None)
new_env = MockGymEnv(max_steps=10)
with patch('multitrack_runner.close_and_switch', return_value=new_env), \
patch('time.sleep'):
test_results, combined, _ = evaluate_test_tracks(
mock_model, current_env=MockGymEnv(), eval_episodes=1
)
track_names = [t[0] for t in TEST_TRACKS]
for name in track_names:
assert name in test_results, f'Missing test result for {name}'
assert isinstance(combined, float)
def test_evaluate_test_tracks_combined_score_is_sum():
"""combined_test_score should equal the sum of individual test track rewards."""
from multitrack_runner import evaluate_test_tracks
mock_model = MagicMock()
mock_model.predict.return_value = (np.array([0.0, 0.5]), None)
new_env = MockGymEnv(max_steps=5)
with patch('multitrack_runner.close_and_switch', return_value=new_env), \
patch('time.sleep'):
test_results, combined, _ = evaluate_test_tracks(
mock_model, current_env=MockGymEnv(), eval_episodes=2
)
expected = sum(r['mean_reward'] for r in test_results.values())
assert combined == pytest.approx(expected, rel=1e-5)
def test_evaluate_test_tracks_connection_failure_gives_zero():
"""If we cannot connect to a test track, its score should be 0.0."""
from multitrack_runner import evaluate_test_tracks
mock_model = MagicMock()
with patch('multitrack_runner.close_and_switch', side_effect=Exception('sim dead')), \
patch('time.sleep'):
test_results, combined, _ = evaluate_test_tracks(
mock_model, current_env=MockGymEnv(), eval_episodes=1
)
for metrics in test_results.values():
assert metrics['mean_reward'] == 0.0
assert combined == 0.0
# ─────────────────────────────────────────────────────────────────────────────
# wave3_controller — unit tests
# ─────────────────────────────────────────────────────────────────────────────
def test_wave3_controller_no_syntax_errors():
"""wave3_controller.py must compile without syntax errors."""
path = os.path.join(os.path.dirname(__file__), '..', 'agent', 'wave3_controller.py')
with open(path) as f:
src = f.read()
compile(src, path, 'exec')
def test_wave3_encode_decode_round_trip():
"""encode_params → decode_params should round-trip within ±5% for each param."""
from wave3_controller import encode_params, decode_params, PARAM_SPACE
original = {
'learning_rate': 0.000225,
'steps_per_switch': 10000,
'total_timesteps': 200000,
}
vec = encode_params(original)
recovered = decode_params(vec)
for k in original:
spec = PARAM_SPACE[k]
rng = spec['max'] - spec['min']
assert abs(recovered[k] - original[k]) < 0.05 * rng, \
f'Round-trip error for {k}: {original[k]}{recovered[k]}'
def test_wave3_decode_clamps_to_bounds():
"""decode_params() should clamp output to [min, max] even if vec is out-of-range."""
from wave3_controller import decode_params, PARAM_SPACE
# vec values outside [0,1]
vec = np.array([-0.5, 1.5, 2.0])
params = decode_params(vec)
for k, v in params.items():
spec = PARAM_SPACE[k]
assert spec['min'] <= v <= spec['max'], \
f'{k}={v} outside [{spec["min"]}, {spec["max"]}]'
def test_wave3_tinygp_predict_shape():
"""TinyGP.predict() should return (mu, sigma) with shape (N,) for N candidates."""
from wave3_controller import TinyGP
gp = TinyGP()
X_train = np.random.rand(5, 3)
y_train = np.random.rand(5)
gp.fit(X_train, y_train)
X_test = np.random.rand(10, 3)
mu, sigma = gp.predict(X_test)
assert mu.shape == (10,), f'Expected (10,), got {mu.shape}'
assert sigma.shape == (10,), f'Expected (10,), got {sigma.shape}'
assert np.all(sigma >= 0), 'Sigma must be non-negative'
def test_wave3_tinygp_ucb_selects_high_value():
"""GP should assign higher UCB to regions near high-reward training points."""
from wave3_controller import TinyGP
gp = TinyGP(length_scale=0.2)
# Point at 0.9 has reward 10, point at 0.1 has reward 0
X_train = np.array([[0.1, 0.1, 0.1], [0.9, 0.9, 0.9]])
y_train = np.array([0.0, 10.0])
gp.fit(X_train, y_train)
# Predict at two test points
X_test = np.array([[0.1, 0.1, 0.1], [0.9, 0.9, 0.9]])
mu, _ = gp.predict(X_test)
assert mu[1] > mu[0], 'GP should predict higher value near the high-reward training point'
def test_wave3_propose_uses_seed_for_first_trials():
"""For trial 1 and 2, propose_next_params() returns the hardcoded seed params."""
from wave3_controller import propose_next_params, SEED_PARAMS
results = [] # No prior data
for i, seed in enumerate(SEED_PARAMS, start=1):
proposed = propose_next_params(results, trial_num=i)
for k, v in seed.items():
assert proposed[k] == v, f'Trial {i}: {k} should be {v}, got {proposed[k]}'
def test_wave3_propose_random_when_few_results():
"""With fewer than MIN_TRIALS_BEFORE_GP results, should use random proposal."""
from wave3_controller import propose_next_params, PARAM_SPACE, MIN_TRIALS_BEFORE_GP, SEED_PARAMS
# Put trial_num beyond seed phase but with too few results for GP
trial_num = len(SEED_PARAMS) + 1
results = [] # Empty — below threshold
proposed = propose_next_params(results, trial_num=trial_num)
for k, spec in PARAM_SPACE.items():
assert spec['min'] <= proposed[k] <= spec['max'], \
f'{k}={proposed[k]} out of bounds [{spec["min"]}, {spec["max"]}]'
def test_wave3_parse_runner_output_combined_score():
"""parse_runner_output() should extract combined_test_score correctly."""
from wave3_controller import parse_runner_output
output = """
[12:34:56] [W3 Runner][TEST] track=mini_monaco mean_reward=1234.56 mean_steps=450.0 ✅ DRIVES
[12:34:57] [W3 Runner][TEST] track=warren mean_reward=789.01 mean_steps=310.0 ✅ DRIVES
[12:34:57] [W3 Runner][TEST] mini_monaco_reward=1234.5600
[12:34:57] [W3 Runner][TEST] warren_reward=789.0100
[12:34:57] [W3 Runner][TEST] combined_test_score=2023.5700
"""
combined, mini_monaco, warren = parse_runner_output(output)
assert combined == pytest.approx(2023.57, rel=1e-4)
assert mini_monaco == pytest.approx(1234.56, rel=1e-4)
assert warren == pytest.approx(789.01, rel=1e-4)
def test_wave3_parse_runner_output_missing_returns_none():
"""parse_runner_output() returns None for each metric if not found."""
from wave3_controller import parse_runner_output
output = 'Training started... timeout'
combined, mini_monaco, warren = parse_runner_output(output)
assert combined is None
assert mini_monaco is None
assert warren is None
def test_wave3_champion_tracker_update_and_load():
"""Wave3ChampionTracker should update champion and persist to disk."""
from wave3_controller import Wave3ChampionTracker
with tempfile.TemporaryDirectory() as tmpdir:
tracker = Wave3ChampionTracker(tmpdir)
assert tracker.best_score == float('-inf')
updated = tracker.update_if_better(
score=1500.0,
params={'learning_rate': 0.0002, 'steps_per_switch': 8000, 'total_timesteps': 150000},
model_zip_path=None,
trial=3,
mini_monaco_reward=900.0,
warren_reward=600.0,
)
assert updated is True
assert tracker.best_score == pytest.approx(1500.0)
# Reload from disk
tracker2 = Wave3ChampionTracker(tmpdir)
assert tracker2.best_score == pytest.approx(1500.0)
assert tracker2._best['trial'] == 3
def test_wave3_champion_tracker_does_not_regress():
"""Champion should not be updated if new score is lower."""
from wave3_controller import Wave3ChampionTracker
with tempfile.TemporaryDirectory() as tmpdir:
tracker = Wave3ChampionTracker(tmpdir)
tracker.update_if_better(2000.0, {}, None, 1,
mini_monaco_reward=1200.0, warren_reward=800.0)
updated = tracker.update_if_better(1500.0, {}, None, 2,
mini_monaco_reward=900.0, warren_reward=600.0)
assert updated is False
assert tracker.best_score == pytest.approx(2000.0)
def test_wave3_results_appended_not_overwritten():
"""Saving results should append to JSONL file, never overwrite."""
from wave3_controller import save_result
with tempfile.TemporaryDirectory() as tmpdir:
# Monkey-patch the RESULTS_FILE path
import wave3_controller
original_path = wave3_controller.RESULTS_FILE
wave3_controller.RESULTS_FILE = os.path.join(tmpdir, 'phase3_results.jsonl')
try:
# Write 3 records
for i in range(3):
save_result(
trial=i + 1,
params={'learning_rate': 0.0002, 'steps_per_switch': 5000,
'total_timesteps': 100000},
combined=float(i * 100),
mini_monaco=float(i * 60),
warren_rwd=float(i * 40),
model_path=None,
is_champion=(i == 2),
status='ok',
elapsed=120.0 * (i + 1),
)
# Should have 3 lines
with open(wave3_controller.RESULTS_FILE) as f:
lines = [l.strip() for l in f if l.strip()]
assert len(lines) == 3, f'Expected 3 result lines, got {len(lines)}'
# All should be valid JSON
for line in lines:
rec = json.loads(line)
assert 'combined_test_score' in rec
assert 'params' in rec
finally:
wave3_controller.RESULTS_FILE = original_path
def test_wave3_zero_score_excluded_from_gp():
"""
Trials with combined_test_score=0 should not be added to the GP data list
(they indicate crashes/timeouts, not useful signal).
"""
# Simulate the logic in run_wave3: only append if combined > 0
results = []
for score in [0.0, 1500.0, 0.0, 800.0]:
if score > 0:
results.append({'params': {}, 'combined_test_score': score})
assert len(results) == 2, 'Only non-zero scores should feed the GP'
assert all(r['combined_test_score'] > 0 for r in results)
def test_wave3_param_space_covers_phase2_champion():
"""The Phase 3 search space must contain the Phase 2 champion's parameters."""
from wave3_controller import PARAM_SPACE
# Phase 2 champion: lr=0.000225, which falls in [5e-5, 1e-3]
assert PARAM_SPACE['learning_rate']['min'] <= 0.000225 <= PARAM_SPACE['learning_rate']['max']
# Moderate switching schedule
assert PARAM_SPACE['steps_per_switch']['min'] <= 10000 <= PARAM_SPACE['steps_per_switch']['max']
# Phase 2 had 13k timesteps — Phase 3 needs much more; check min >= 50k
assert PARAM_SPACE['total_timesteps']['min'] >= 50000
def test_wave3_seed_params_within_space():
"""All seed parameter sets must be within the defined search space."""
from wave3_controller import SEED_PARAMS, PARAM_SPACE
for i, seed in enumerate(SEED_PARAMS):
for k, v in seed.items():
spec = PARAM_SPACE[k]
assert spec['min'] <= v <= spec['max'], \
f'Seed {i}: {k}={v} outside [{spec["min"]}, {spec["max"]}]'
def test_health_check_callback_stops_on_stuck():
"""HealthCheckCallback should return False when speed stays near zero."""
from multitrack_runner import HealthCheckCallback
cb = HealthCheckCallback(max_stuck_steps=5, min_speed=0.1)
cb.health._stuck_count = 4 # One more step will trigger
# Simulate a callback step with very low speed
cb.locals = {
'infos': [{'speed': 0.0}],
'new_obs': None,
}
result = cb._on_step()
assert result is False, 'Callback should stop training when sim is stuck'