288 lines
11 KiB
Python
288 lines
11 KiB
Python
"""
|
||
Tests for reward_wrapper.py v4 (full sim bypass — base × efficiency × speed).
|
||
"""
|
||
|
||
import sys, os, math, pytest
|
||
import numpy as np
|
||
import gymnasium as gym
|
||
from collections import deque
|
||
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
|
||
from reward_wrapper import SpeedRewardWrapper
|
||
|
||
|
||
# ---- Mock Environments ----
|
||
|
||
class MockEnv(gym.Env):
|
||
"""Configurable mock gymnasium.Env."""
|
||
metadata = {'render_modes': []}
|
||
|
||
def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True):
|
||
super().__init__()
|
||
self.action_space = gym.spaces.Discrete(5)
|
||
self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8)
|
||
self._speed = speed
|
||
self._cte = cte
|
||
self._pos = list(pos)
|
||
self._done = done
|
||
self._use_5tuple = use_5tuple
|
||
|
||
def set_pos(self, p): self._pos = list(p)
|
||
def set_cte(self, c): self._cte = c
|
||
|
||
def reset(self, seed=None, **kwargs):
|
||
return np.zeros((120, 160, 3), dtype=np.uint8), {}
|
||
|
||
def step(self, action):
|
||
obs = np.zeros((120, 160, 3), dtype=np.uint8)
|
||
# Sim reward uses forward_vel (exploitable) — wrapper should IGNORE this
|
||
sim_reward = 999.0 # Deliberately bogus — wrapper must not use this
|
||
info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos}
|
||
if self._use_5tuple:
|
||
return obs, sim_reward, self._done, False, info
|
||
return obs, sim_reward, self._done, info
|
||
|
||
def close(self): pass
|
||
|
||
|
||
def step_wrapped(wrapped_env, env, pos, cte=0.5, speed=2.0):
|
||
env.set_pos(pos)
|
||
env.set_cte(cte)
|
||
env._speed = speed
|
||
return wrapped_env.step(0)
|
||
|
||
|
||
# ---- Core v4 Properties ----
|
||
|
||
def test_sim_reward_is_completely_ignored():
|
||
"""
|
||
The wrapper must NOT use the sim's reward (999.0).
|
||
v4 computes reward from scratch using CTE/pos/speed only.
|
||
"""
|
||
env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.))
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
|
||
wrapped.reset()
|
||
_, reward, _, _, _ = wrapped.step(0)
|
||
assert reward != 999.0, "Wrapper must not pass through sim's bogus reward"
|
||
assert reward < 10.0, f"Reward should be small, got {reward}"
|
||
|
||
|
||
def test_circling_at_zero_cte_gives_near_zero_reward():
|
||
"""
|
||
v5: circling protection is handled by lap-time penalty + StuckTermination,
|
||
NOT by the reward formula. A circling car at CTE=0 with speed CAN earn
|
||
reward per step. This test verifies the formula works as designed:
|
||
reward = speed_norm * cte_quality. Circling is stopped by other mechanisms.
|
||
"""
|
||
env = MockEnv(speed=3.0, cte=0.0)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=20)
|
||
wrapped.reset()
|
||
|
||
# At CTE=0 and speed=3, expected reward = (3/10) * 1.0 = 0.3
|
||
_, r, _, _, _ = wrapped.step(0)
|
||
expected = (3.0 / 10.0) * 1.0
|
||
assert abs(r - expected) < 0.05, (
|
||
f"v5: reward at CTE=0, speed=3 should be ~{expected:.2f}, got {r:.4f}")
|
||
|
||
|
||
def test_forward_driving_earns_positive_reward():
|
||
"""Straight-line driving at low CTE and reasonable speed earns positive reward."""
|
||
env = MockEnv(speed=5.0, cte=0.5)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
|
||
wrapped.reset()
|
||
_, r, _, _, _ = wrapped.step(0)
|
||
# reward = (5/10) * (1 - 0.5/8) = 0.5 * 0.9375 = 0.469
|
||
assert r > 0.3, f"Forward driving should earn >0.3 reward, got {r:.4f}"
|
||
|
||
|
||
def test_forward_beats_circling_by_large_margin():
|
||
"""
|
||
v5: forward driving at moderate CTE should beat driving with high CTE.
|
||
The reward directly penalises being off-centre.
|
||
"""
|
||
# On track (CTE=1m) at speed=5
|
||
env_on = MockEnv(speed=5.0, cte=1.0)
|
||
wrapped_on = SpeedRewardWrapper(env_on, speed_scale=0.1)
|
||
wrapped_on.reset()
|
||
_, r_on, _, _, _ = wrapped_on.step(0)
|
||
|
||
# Off track (CTE=7m) at same speed
|
||
env_off = MockEnv(speed=5.0, cte=7.0)
|
||
wrapped_off = SpeedRewardWrapper(env_off, speed_scale=0.1)
|
||
wrapped_off.reset()
|
||
_, r_off, _, _, _ = wrapped_off.step(0)
|
||
|
||
assert r_on > r_off * 3, (
|
||
f"On-track ({r_on:.2f}) should beat off-track ({r_off:.2f}) by 3x")
|
||
|
||
|
||
def test_crash_gives_negative_reward():
|
||
"""Episode termination (done=True) must always give -1.0."""
|
||
env = MockEnv(speed=5.0, cte=0.0, done=True)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.2)
|
||
wrapped.reset()
|
||
_, reward, _, _, _ = wrapped.step(0)
|
||
assert reward == -1.0, f"Crash reward must be -1.0, got {reward}"
|
||
|
||
|
||
def test_high_cte_reduces_reward():
|
||
"""Higher CTE should reduce reward (closer to track edge = lower base)."""
|
||
env_low = MockEnv(speed=2.0, cte=0.5)
|
||
env_high = MockEnv(speed=2.0, cte=4.0)
|
||
|
||
wrapped_low = SpeedRewardWrapper(env_low, speed_scale=0.1, window_size=5)
|
||
wrapped_high = SpeedRewardWrapper(env_high, speed_scale=0.1, window_size=5)
|
||
wrapped_low.reset()
|
||
wrapped_high.reset()
|
||
|
||
# Drive straight so efficiency fills up
|
||
for i in range(10):
|
||
env_low.set_pos([i * 0.3, 0., 0.])
|
||
env_high.set_pos([i * 0.3, 0., 0.])
|
||
_, r_low, _, _, _ = wrapped_low.step(0)
|
||
_, r_high, _, _, _ = wrapped_high.step(0)
|
||
|
||
assert r_low > r_high, f"Low CTE ({r_low:.3f}) should reward more than high CTE ({r_high:.3f})"
|
||
|
||
|
||
def test_speed_bonus_increases_reward_when_on_track():
|
||
"""Faster forward driving earns more reward than slower forward driving."""
|
||
env_slow = MockEnv(speed=0.5, cte=1.0)
|
||
env_fast = MockEnv(speed=3.0, cte=1.0)
|
||
|
||
wrapped_slow = SpeedRewardWrapper(env_slow, speed_scale=0.1, window_size=10)
|
||
wrapped_fast = SpeedRewardWrapper(env_fast, speed_scale=0.1, window_size=10)
|
||
wrapped_slow.reset()
|
||
wrapped_fast.reset()
|
||
|
||
for i in range(15):
|
||
env_slow.set_pos([i * 0.1, 0., 0.])
|
||
env_fast.set_pos([i * 0.3, 0., 0.]) # Fast car covers more ground
|
||
_, r_slow, _, _, _ = wrapped_slow.step(0)
|
||
_, r_fast, _, _, _ = wrapped_fast.step(0)
|
||
|
||
assert r_fast > r_slow, f"Fast ({r_fast:.3f}) should earn more than slow ({r_slow:.3f})"
|
||
|
||
|
||
def test_theoretical_max_per_step():
|
||
"""Max reward/step = 1.0 × 1.0 × (1 + scale × max_speed) = 2.0 at scale=0.1, max=10."""
|
||
env = MockEnv()
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
|
||
assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6)
|
||
|
||
|
||
def test_4tuple_step_compatibility():
|
||
"""Wrapper must handle 4-tuple step() return (old gym API)."""
|
||
env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False)
|
||
env.set_pos([0., 0., 0.])
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
|
||
wrapped.reset()
|
||
result = wrapped.step(0)
|
||
assert len(result) == 4, f"Expected 4-tuple, got {len(result)}"
|
||
_, reward, done, info = result
|
||
assert isinstance(reward, float)
|
||
assert reward != 999.0, "Should not use sim reward"
|
||
|
||
|
||
def test_reward_resets_on_episode_reset():
|
||
"""After reset, position history clears so efficiency recalculates cleanly."""
|
||
env = MockEnv(speed=2.0, cte=0.5)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
|
||
wrapped.reset()
|
||
|
||
# Fill with circular data
|
||
for i in range(15):
|
||
angle = 2 * math.pi * i / 12
|
||
env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)])
|
||
wrapped.step(0)
|
||
|
||
# After reset, start fresh straight
|
||
wrapped.reset()
|
||
rewards = []
|
||
for i in range(5):
|
||
env.set_pos([i * 0.3, 0., 0.])
|
||
_, r, _, _, _ = wrapped.step(0)
|
||
rewards.append(r)
|
||
|
||
# Should get reasonable reward after fresh start
|
||
assert rewards[-1] > 0, "Should get positive reward after reset and straight driving"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Short-lap exploit patch tests
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def test_short_lap_triggers_penalty():
|
||
"""
|
||
A lap completed faster than min_lap_time must return a large penalty,
|
||
not a positive reward. This closes the start/finish circle exploit.
|
||
"""
|
||
env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.))
|
||
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
|
||
wrapper.reset()
|
||
|
||
# Simulate step where a new lap completes in 1 second (exploit)
|
||
info = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0),
|
||
'lap_count': 1, 'last_lap_time': 1.0}
|
||
reward, _ = wrapper._compute_reward_and_done(done=False, info=info)
|
||
assert reward < 0, f'Short lap (1s) should penalise, got reward={reward}'
|
||
assert reward <= -10.0, f'Short lap penalty should be large (<= -10), got {reward}'
|
||
|
||
|
||
def test_legitimate_lap_not_penalised():
|
||
"""
|
||
A lap completed above min_lap_time must NOT trigger the penalty.
|
||
"""
|
||
env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.))
|
||
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
|
||
wrapper.reset()
|
||
|
||
# First step — no lap yet
|
||
info_no_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0),
|
||
'lap_count': 0, 'last_lap_time': 0.0}
|
||
wrapper._compute_reward_and_done(done=False, info=info_no_lap)
|
||
|
||
# Legitimate lap at 12 seconds
|
||
info = {'cte': 0.2, 'speed': 3.0, 'pos': (1.0, 0.0, 0.0),
|
||
'lap_count': 1, 'last_lap_time': 12.0}
|
||
reward, _ = wrapper._compute_reward_and_done(done=False, info=info)
|
||
assert reward >= 0, f'Legitimate lap (12s) should not be penalised, got {reward}'
|
||
|
||
|
||
def test_lap_count_not_double_penalised():
|
||
"""
|
||
Penalty fires exactly once per short lap, not on every subsequent step.
|
||
"""
|
||
env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.))
|
||
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
|
||
wrapper.reset()
|
||
|
||
# Short lap fires on step where lap_count increments
|
||
info_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0),
|
||
'lap_count': 1, 'last_lap_time': 1.5}
|
||
r1, _ = wrapper._compute_reward_and_done(done=False, info=info_lap)
|
||
assert r1 < 0
|
||
|
||
# Next step same lap_count — should get normal reward, not another penalty
|
||
info_next = {'cte': 0.0, 'speed': 3.0, 'pos': (0.1, 0.0, 0.0),
|
||
'lap_count': 1, 'last_lap_time': 1.5}
|
||
r2, _ = wrapper._compute_reward_and_done(done=False, info=info_next)
|
||
assert r2 >= 0, f'Penalty should not repeat on same lap_count, got r2={r2}'
|
||
|
||
|
||
def test_lap_count_resets_on_episode_reset():
|
||
"""lap_count tracker must reset when the episode resets."""
|
||
env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.))
|
||
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
|
||
wrapper.reset()
|
||
|
||
# Complete a short lap
|
||
info_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0),
|
||
'lap_count': 1, 'last_lap_time': 1.0}
|
||
wrapper._compute_reward_and_done(done=False, info=info_lap)
|
||
assert wrapper._last_lap_count == 1
|
||
|
||
# Reset episode — counter must go back to 0
|
||
wrapper.reset()
|
||
assert wrapper._last_lap_count == 0
|