""" Tests for reward_wrapper.py v4 (full sim bypass — base × efficiency × speed). """ import sys, os, math, pytest import numpy as np import gymnasium as gym from collections import deque sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent')) from reward_wrapper import SpeedRewardWrapper # ---- Mock Environments ---- class MockEnv(gym.Env): """Configurable mock gymnasium.Env.""" metadata = {'render_modes': []} def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True): super().__init__() self.action_space = gym.spaces.Discrete(5) self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8) self._speed = speed self._cte = cte self._pos = list(pos) self._done = done self._use_5tuple = use_5tuple def set_pos(self, p): self._pos = list(p) def set_cte(self, c): self._cte = c def reset(self, seed=None, **kwargs): return np.zeros((120, 160, 3), dtype=np.uint8), {} def step(self, action): obs = np.zeros((120, 160, 3), dtype=np.uint8) # Sim reward uses forward_vel (exploitable) — wrapper should IGNORE this sim_reward = 999.0 # Deliberately bogus — wrapper must not use this info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos} if self._use_5tuple: return obs, sim_reward, self._done, False, info return obs, sim_reward, self._done, info def close(self): pass def step_wrapped(wrapped_env, env, pos, cte=0.5, speed=2.0): env.set_pos(pos) env.set_cte(cte) env._speed = speed return wrapped_env.step(0) # ---- Core v4 Properties ---- def test_sim_reward_is_completely_ignored(): """ The wrapper must NOT use the sim's reward (999.0). v4 computes reward from scratch using CTE/pos/speed only. """ env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.)) wrapped = SpeedRewardWrapper(env, speed_scale=0.1) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward != 999.0, "Wrapper must not pass through sim's bogus reward" assert reward < 10.0, f"Reward should be small, got {reward}" def test_circling_at_zero_cte_gives_near_zero_reward(): """ v5: circling protection is handled by lap-time penalty + StuckTermination, NOT by the reward formula. A circling car at CTE=0 with speed CAN earn reward per step. This test verifies the formula works as designed: reward = speed_norm * cte_quality. Circling is stopped by other mechanisms. """ env = MockEnv(speed=3.0, cte=0.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=20) wrapped.reset() # At CTE=0 and speed=3, expected reward = (3/10) * 1.0 = 0.3 _, r, _, _, _ = wrapped.step(0) expected = (3.0 / 10.0) * 1.0 assert abs(r - expected) < 0.05, ( f"v5: reward at CTE=0, speed=3 should be ~{expected:.2f}, got {r:.4f}") def test_forward_driving_earns_positive_reward(): """Straight-line driving at low CTE and reasonable speed earns positive reward.""" env = MockEnv(speed=5.0, cte=0.5) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() _, r, _, _, _ = wrapped.step(0) # reward = (5/10) * (1 - 0.5/8) = 0.5 * 0.9375 = 0.469 assert r > 0.3, f"Forward driving should earn >0.3 reward, got {r:.4f}" def test_forward_beats_circling_by_large_margin(): """ v5: forward driving at moderate CTE should beat driving with high CTE. The reward directly penalises being off-centre. """ # On track (CTE=1m) at speed=5 env_on = MockEnv(speed=5.0, cte=1.0) wrapped_on = SpeedRewardWrapper(env_on, speed_scale=0.1) wrapped_on.reset() _, r_on, _, _, _ = wrapped_on.step(0) # Off track (CTE=7m) at same speed env_off = MockEnv(speed=5.0, cte=7.0) wrapped_off = SpeedRewardWrapper(env_off, speed_scale=0.1) wrapped_off.reset() _, r_off, _, _, _ = wrapped_off.step(0) assert r_on > r_off * 3, ( f"On-track ({r_on:.2f}) should beat off-track ({r_off:.2f}) by 3x") def test_crash_gives_negative_reward(): """Episode termination (done=True) must always give -1.0.""" env = MockEnv(speed=5.0, cte=0.0, done=True) wrapped = SpeedRewardWrapper(env, speed_scale=0.2) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward == -1.0, f"Crash reward must be -1.0, got {reward}" def test_high_cte_reduces_reward(): """Higher CTE should reduce reward (closer to track edge = lower base).""" env_low = MockEnv(speed=2.0, cte=0.5) env_high = MockEnv(speed=2.0, cte=4.0) wrapped_low = SpeedRewardWrapper(env_low, speed_scale=0.1, window_size=5) wrapped_high = SpeedRewardWrapper(env_high, speed_scale=0.1, window_size=5) wrapped_low.reset() wrapped_high.reset() # Drive straight so efficiency fills up for i in range(10): env_low.set_pos([i * 0.3, 0., 0.]) env_high.set_pos([i * 0.3, 0., 0.]) _, r_low, _, _, _ = wrapped_low.step(0) _, r_high, _, _, _ = wrapped_high.step(0) assert r_low > r_high, f"Low CTE ({r_low:.3f}) should reward more than high CTE ({r_high:.3f})" def test_speed_bonus_increases_reward_when_on_track(): """Faster forward driving earns more reward than slower forward driving.""" env_slow = MockEnv(speed=0.5, cte=1.0) env_fast = MockEnv(speed=3.0, cte=1.0) wrapped_slow = SpeedRewardWrapper(env_slow, speed_scale=0.1, window_size=10) wrapped_fast = SpeedRewardWrapper(env_fast, speed_scale=0.1, window_size=10) wrapped_slow.reset() wrapped_fast.reset() for i in range(15): env_slow.set_pos([i * 0.1, 0., 0.]) env_fast.set_pos([i * 0.3, 0., 0.]) # Fast car covers more ground _, r_slow, _, _, _ = wrapped_slow.step(0) _, r_fast, _, _, _ = wrapped_fast.step(0) assert r_fast > r_slow, f"Fast ({r_fast:.3f}) should earn more than slow ({r_slow:.3f})" def test_theoretical_max_per_step(): """Max reward/step = 1.0 × 1.0 × (1 + scale × max_speed) = 2.0 at scale=0.1, max=10.""" env = MockEnv() wrapped = SpeedRewardWrapper(env, speed_scale=0.1) assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6) def test_4tuple_step_compatibility(): """Wrapper must handle 4-tuple step() return (old gym API).""" env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False) env.set_pos([0., 0., 0.]) wrapped = SpeedRewardWrapper(env, speed_scale=0.1) wrapped.reset() result = wrapped.step(0) assert len(result) == 4, f"Expected 4-tuple, got {len(result)}" _, reward, done, info = result assert isinstance(reward, float) assert reward != 999.0, "Should not use sim reward" def test_reward_resets_on_episode_reset(): """After reset, position history clears so efficiency recalculates cleanly.""" env = MockEnv(speed=2.0, cte=0.5) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() # Fill with circular data for i in range(15): angle = 2 * math.pi * i / 12 env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) wrapped.step(0) # After reset, start fresh straight wrapped.reset() rewards = [] for i in range(5): env.set_pos([i * 0.3, 0., 0.]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) # Should get reasonable reward after fresh start assert rewards[-1] > 0, "Should get positive reward after reset and straight driving" # --------------------------------------------------------------------------- # Short-lap exploit patch tests # --------------------------------------------------------------------------- def test_short_lap_triggers_penalty(): """ A lap completed faster than min_lap_time must return a large penalty, not a positive reward. This closes the start/finish circle exploit. """ env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.)) wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() # Simulate step where a new lap completes in 1 second (exploit) info = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 1.0} reward = wrapper._compute_reward(done=False, info=info) assert reward < 0, f'Short lap (1s) should penalise, got reward={reward}' assert reward <= -10.0, f'Short lap penalty should be large (<= -10), got {reward}' def test_legitimate_lap_not_penalised(): """ A lap completed above min_lap_time must NOT trigger the penalty. """ env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.)) wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() # First step — no lap yet info_no_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0), 'lap_count': 0, 'last_lap_time': 0.0} wrapper._compute_reward(done=False, info=info_no_lap) # Legitimate lap at 12 seconds info = {'cte': 0.2, 'speed': 3.0, 'pos': (1.0, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 12.0} reward = wrapper._compute_reward(done=False, info=info) assert reward >= 0, f'Legitimate lap (12s) should not be penalised, got {reward}' def test_lap_count_not_double_penalised(): """ Penalty fires exactly once per short lap, not on every subsequent step. """ env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.)) wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() # Short lap fires on step where lap_count increments info_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 1.5} r1 = wrapper._compute_reward(done=False, info=info_lap) assert r1 < 0 # Next step same lap_count — should get normal reward, not another penalty info_next = {'cte': 0.0, 'speed': 3.0, 'pos': (0.1, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 1.5} r2 = wrapper._compute_reward(done=False, info=info_next) assert r2 >= 0, f'Penalty should not repeat on same lap_count, got r2={r2}' def test_lap_count_resets_on_episode_reset(): """lap_count tracker must reset when the episode resets.""" env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.)) wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() # Complete a short lap info_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 1.0} wrapper._compute_reward(done=False, info=info_lap) assert wrapper._last_lap_count == 1 # Reset episode — counter must go back to 0 wrapper.reset() assert wrapper._last_lap_count == 0