donkeycar-rl-autoresearch/tests/test_end_to_end.py

427 lines
16 KiB
Python

"""
End-to-end pipeline tests — no live simulator required.
These tests exist to catch integration-level bugs that unit tests miss.
The LR-override bug (Wave 3: all trials silently ran at LR=0.000225) was
not caught because we had no test that verified the optimizer's actual LR
after PPO.load(). Every test in this file targets a real failure that
already burned training time.
Test categories
---------------
1. LR override — PPO.load() + param_group update
2. create_or_load_model — the function that wraps PPO.load in multitrack_runner
3. Training step LR — a short real PPO.learn() to confirm the log LR matches
4. Output parsing — parse_runner_output extracts correct metrics
5. Results round-trip — save → load → GP uses correct data
"""
import json
import os
import sys
import tempfile
import numpy as np
import pytest
# ---------------------------------------------------------------------------
# Path setup — tests run from repo root or tests/ dir
# ---------------------------------------------------------------------------
AGENT_DIR = os.path.join(os.path.dirname(__file__), '..', 'agent')
if AGENT_DIR not in sys.path:
sys.path.insert(0, AGENT_DIR)
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
import gymnasium as gym
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage
class MockDonkeyEnv(gym.Env):
"""
Minimal DonkeyCar-shaped env: image observations, Box actions.
No simulator required.
"""
metadata = {'render_modes': []}
def __init__(self):
super().__init__()
self.observation_space = gym.spaces.Box(
low=0, high=255, shape=(120, 160, 3), dtype=np.uint8
)
self.action_space = gym.spaces.Box(
low=np.array([-1.0, 0.0]),
high=np.array([1.0, 1.0]),
dtype=np.float32,
)
self._step_count = 0
def reset(self, seed=None, **kwargs):
self._step_count = 0
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
self._step_count += 1
obs = np.random.randint(0, 255, (120, 160, 3), dtype=np.uint8)
terminated = self._step_count >= 30
return obs, 1.0, terminated, False, {'speed': 2.0, 'cte': 0.1}
def close(self):
pass
def make_vec_env():
"""Wrap MockDonkeyEnv the same way SB3 expects for CnnPolicy."""
return VecTransposeImage(DummyVecEnv([MockDonkeyEnv]))
def save_ppo_model(path, lr):
"""Create a tiny CnnPolicy PPO, save it, return the path."""
env = make_vec_env()
model = PPO('CnnPolicy', env, learning_rate=lr, verbose=0,
n_steps=64, batch_size=16)
model.save(path)
env.close()
return path
# ===========================================================================
# 1. LR override — the bug that burned 8 hours of training
# ===========================================================================
def test_lr_override_bug_demonstration():
"""
Setting model.learning_rate after PPO.load() does NOT update the
optimizer. This demonstrates the exact bug that caused all Wave 3
trials to run at 0.000225 regardless of the GP-proposed LR.
"""
ORIGINAL_LR = 0.000225
NEW_LR = 0.001
env = make_vec_env()
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'model.zip')
save_ppo_model(path, ORIGINAL_LR)
loaded = PPO.load(path, env=env, device='cpu')
# WRONG: only set the Python attribute, not the optimizer
loaded.learning_rate = NEW_LR
actual_lr = loaded.policy.optimizer.param_groups[0]['lr']
# The optimizer still has the OLD lr — this is the bug
assert actual_lr == pytest.approx(ORIGINAL_LR), (
f"Expected optimizer to STILL have old LR {ORIGINAL_LR} "
f"(demonstrating the bug), got {actual_lr}"
)
env.close()
def test_lr_override_fix_sets_optimizer_param_groups():
"""
The correct fix: after PPO.load(), set lr on BOTH model.learning_rate
AND every optimizer param_group. This is what multitrack_runner now does.
"""
ORIGINAL_LR = 0.000225
NEW_LR = 0.001
env = make_vec_env()
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'model.zip')
save_ppo_model(path, ORIGINAL_LR)
loaded = PPO.load(path, env=env, device='cpu')
# CORRECT fix
loaded.learning_rate = NEW_LR
for pg in loaded.policy.optimizer.param_groups:
pg['lr'] = NEW_LR
# model attribute updated
assert loaded.learning_rate == pytest.approx(NEW_LR)
# optimizer updated — this is what matters for actual gradient updates
for i, pg in enumerate(loaded.policy.optimizer.param_groups):
assert pg['lr'] == pytest.approx(NEW_LR), (
f"param_group[{i}]['lr'] = {pg['lr']}, expected {NEW_LR}"
)
env.close()
def test_lr_override_survives_one_training_step():
"""
After the COMPLETE fix (learning_rate + lr_schedule + param_groups),
the optimizer LR must still be correct after one real PPO gradient update.
Root cause of the original bug:
- PPO.load() bakes lr_schedule = FloatSchedule(0.000225) into the model
- train() calls _update_learning_rate() which reads lr_schedule, NOT learning_rate
- So even if param_groups are patched, train() overwrites them back to 0.000225
- Fix: also patch model.lr_schedule = get_schedule_fn(NEW_LR)
"""
from stable_baselines3.common.utils import get_schedule_fn
ORIGINAL_LR = 0.000225
NEW_LR = 0.001
env = make_vec_env()
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'model.zip')
save_ppo_model(path, ORIGINAL_LR)
loaded = PPO.load(path, env=env, device='cpu')
# Complete 3-part fix
loaded.learning_rate = NEW_LR
loaded.lr_schedule = get_schedule_fn(NEW_LR) # <-- prevents train() reverting LR
for pg in loaded.policy.optimizer.param_groups:
pg['lr'] = NEW_LR
# Run a minimal training step
loaded.learn(total_timesteps=64, reset_num_timesteps=True)
# LR must still be NEW_LR after _update_learning_rate() fired
for i, pg in enumerate(loaded.policy.optimizer.param_groups):
assert pg['lr'] == pytest.approx(NEW_LR), (
f"After learn(), param_group[{i}]['lr'] = {pg['lr']}, "
f"expected {NEW_LR}. lr_schedule was not patched correctly."
)
env.close()
# ===========================================================================
# 2. create_or_load_model — the actual function in multitrack_runner
# ===========================================================================
def test_create_or_load_model_warm_start_lr_reaches_optimizer():
"""
create_or_load_model() must leave the optimizer at the requested LR,
not at the LR baked into the saved model — both before AND after a
training step (lr_schedule must be patched, not just param_groups).
"""
from multitrack_runner import create_or_load_model
ORIGINAL_LR = 0.000225
NEW_LR = 0.00083
env = make_vec_env()
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'model.zip')
save_ppo_model(path, ORIGINAL_LR)
model = create_or_load_model(env, learning_rate=NEW_LR,
warm_start_path=path)
# model attribute
assert model.learning_rate == pytest.approx(NEW_LR)
# lr_schedule (used by _update_learning_rate during train())
assert model.lr_schedule(1.0) == pytest.approx(NEW_LR), (
f"lr_schedule(1.0) = {model.lr_schedule(1.0)}, expected {NEW_LR}. "
"train() will revert optimizer to old LR without this fix."
)
# optimizer param_groups
for i, pg in enumerate(model.policy.optimizer.param_groups):
assert pg['lr'] == pytest.approx(NEW_LR)
# Verify it survives an actual training step
model.learn(total_timesteps=64, reset_num_timesteps=True)
for i, pg in enumerate(model.policy.optimizer.param_groups):
assert pg['lr'] == pytest.approx(NEW_LR), (
f"After learn(), param_group[{i}]['lr'] = {pg['lr']}, expected {NEW_LR}"
)
env.close()
def test_create_or_load_model_fresh_model_uses_correct_lr():
"""
When warm_start_path is None/missing, create_or_load_model() must
create a fresh PPO with the requested LR.
"""
from multitrack_runner import create_or_load_model
LR = 0.00075
env = make_vec_env()
model = create_or_load_model(env, learning_rate=LR, warm_start_path=None)
assert model.learning_rate == pytest.approx(LR)
for pg in model.policy.optimizer.param_groups:
assert pg['lr'] == pytest.approx(LR)
env.close()
def test_create_or_load_model_falls_back_to_fresh_on_bad_path():
"""
If the warm_start_path doesn't exist, create_or_load_model() must
fall back to a fresh model (not crash).
"""
from multitrack_runner import create_or_load_model
LR = 0.0005
env = make_vec_env()
model = create_or_load_model(env, learning_rate=LR,
warm_start_path='/nonexistent/model.zip')
assert model is not None
assert model.learning_rate == pytest.approx(LR)
env.close()
# ===========================================================================
# 3. Output parsing — parse_runner_output extracts correct metrics
# ===========================================================================
def test_parse_runner_output_full_success():
"""parse_runner_output correctly extracts all metrics from a full run."""
from wave3_controller import parse_runner_output
output = """
[12:00:01] [W3 Runner][TRAIN] track=generated_road segment_reward=2409.70
[12:08:00] [W3 Runner][TRAIN] track=generated_track segment_reward=112.30
[12:15:00] [W3 Runner] Switching to TEST track: mini_monaco
[12:15:30] [W3 Runner][TEST] track=mini_monaco mean_reward=843.21 mean_steps=980.0 ✅ DRIVES
[12:15:30] [W3 Runner][TEST] mini_monaco_reward=843.2100
[12:15:30] [W3 Runner][TEST] combined_test_score=843.2100
"""
combined, mini_monaco = parse_runner_output(output)
assert combined == pytest.approx(843.21, rel=1e-4)
assert mini_monaco == pytest.approx(843.21, rel=1e-4)
def test_parse_runner_output_crash():
"""parse_runner_output handles a crash/timeout (no test score lines)."""
from wave3_controller import parse_runner_output
output = "[TIMEOUT after 7200s]"
combined, mini_monaco = parse_runner_output(output)
assert combined is None
assert mini_monaco is None
def test_parse_runner_output_partial():
"""parse_runner_output handles missing combined but present mini_monaco."""
from wave3_controller import parse_runner_output
output = "[W3 Runner][TEST] mini_monaco_reward=55.5\n"
combined, mini_monaco = parse_runner_output(output)
assert combined is None
assert mini_monaco == pytest.approx(55.5, rel=1e-4)
# ===========================================================================
# 4. Results round-trip — save → load → GP uses the data
# ===========================================================================
def test_results_round_trip_gp_sees_correct_lr():
"""
After save_result() writes a trial, load_results() must return it, and
the GP must receive the correct params (including the actual LR used).
This ensures GP data is not silently corrupted.
"""
import wave3_controller
TRIAL_LR = 0.00083
TRIAL_SCORE = 250.0
with tempfile.TemporaryDirectory() as tmpdir:
original_path = wave3_controller.RESULTS_FILE
wave3_controller.RESULTS_FILE = os.path.join(tmpdir, 'results.jsonl')
try:
from wave3_controller import save_result, load_results
save_result(
trial=1,
params={'learning_rate': TRIAL_LR,
'steps_per_switch': 8000,
'total_timesteps': 45000},
combined=TRIAL_SCORE,
mini_monaco=TRIAL_SCORE,
model_path=None,
is_champion=True,
status='ok',
elapsed=1200.0,
)
results = load_results()
assert len(results) == 1
assert results[0]['params']['learning_rate'] == pytest.approx(TRIAL_LR)
assert results[0]['combined_test_score'] == pytest.approx(TRIAL_SCORE)
finally:
wave3_controller.RESULTS_FILE = original_path
def test_results_gp_data_never_includes_zero_score_trials():
"""
Zero-score trials (crash/timeout) must NOT be added to GP data.
If they were, the GP would learn that certain params are bad even
when the failure was actually a simulator glitch.
"""
import wave3_controller
with tempfile.TemporaryDirectory() as tmpdir:
original_path = wave3_controller.RESULTS_FILE
wave3_controller.RESULTS_FILE = os.path.join(tmpdir, 'results.jsonl')
try:
from wave3_controller import save_result, load_results
# Write a zero-score (crash) trial
save_result(
trial=1,
params={'learning_rate': 0.001, 'steps_per_switch': 5000,
'total_timesteps': 30000},
combined=0.0,
mini_monaco=0.0,
model_path=None,
is_champion=False,
status='timeout',
elapsed=7200.0,
)
# Write a valid trial
save_result(
trial=2,
params={'learning_rate': 0.0005, 'steps_per_switch': 8000,
'total_timesteps': 45000},
combined=300.0,
mini_monaco=300.0,
model_path=None,
is_champion=True,
status='ok',
elapsed=1800.0,
)
results = load_results()
# load_results only returns trials with non-None score; the
# wave3 main loop further filters out score==0 before adding to GP
scores = [r['combined_test_score'] for r in results]
assert 300.0 in scores
# The zero-score trial IS in the file but the main loop guards it
assert 0.0 in scores # it's saved
finally:
wave3_controller.RESULTS_FILE = original_path
# ===========================================================================
# 5. Seed params sanity — seed trials cover the important LR range
# ===========================================================================
def test_seed_params_cover_both_low_and_high_lr():
"""
SEED_PARAMS must include at least one low-LR trial (≤ 3e-4) and
at least one higher-LR trial (≥ 5e-4) so the GP starts with data
across the search space, not just at one corner.
"""
from wave3_controller import SEED_PARAMS
lrs = [p['learning_rate'] for p in SEED_PARAMS]
assert min(lrs) <= 3e-4, f"No low-LR seed trial: {lrs}"
assert max(lrs) >= 5e-4, f"No high-LR seed trial: {lrs}"
def test_seed_params_lr_is_not_all_identical():
"""SEED_PARAMS must not all have the same LR — that killed Wave 3 v1."""
from wave3_controller import SEED_PARAMS
lrs = [p['learning_rate'] for p in SEED_PARAMS]
assert len(set(lrs)) > 1, (
f"All seed params have the same LR ({lrs[0]}). "
"The GP needs diverse starting data to explore the LR dimension."
)