""" Tests for reward_wrapper.py v4 (full sim bypass — base × efficiency × speed). """ import sys, os, math, pytest import numpy as np import gymnasium as gym from collections import deque sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent')) from reward_wrapper import SpeedRewardWrapper # ---- Mock Environments ---- class MockEnv(gym.Env): """Configurable mock gymnasium.Env.""" metadata = {'render_modes': []} def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True): super().__init__() self.action_space = gym.spaces.Discrete(5) self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8) self._speed = speed self._cte = cte self._pos = list(pos) self._done = done self._use_5tuple = use_5tuple def set_pos(self, p): self._pos = list(p) def set_cte(self, c): self._cte = c def reset(self, seed=None, **kwargs): return np.zeros((120, 160, 3), dtype=np.uint8), {} def step(self, action): obs = np.zeros((120, 160, 3), dtype=np.uint8) # Sim reward uses forward_vel (exploitable) — wrapper should IGNORE this sim_reward = 999.0 # Deliberately bogus — wrapper must not use this info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos} if self._use_5tuple: return obs, sim_reward, self._done, False, info return obs, sim_reward, self._done, info def close(self): pass def step_wrapped(wrapped_env, env, pos, cte=0.5, speed=2.0): env.set_pos(pos) env.set_cte(cte) env._speed = speed return wrapped_env.step(0) # ---- Core v4 Properties ---- def test_sim_reward_is_completely_ignored(): """ The wrapper must NOT use the sim's reward (999.0). v4 computes reward from scratch using CTE/pos/speed only. """ env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.)) wrapped = SpeedRewardWrapper(env, speed_scale=0.1) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward != 999.0, "Wrapper must not pass through sim's bogus reward" assert reward < 10.0, f"Reward should be small, got {reward}" def test_circling_at_zero_cte_gives_near_zero_reward(): """ v6: circling (low efficiency) should yield zero reward via the efficiency gate. After enough steps of circular motion, the efficiency drops below threshold and the gate zeros the reward. """ env = MockEnv(speed=3.0, cte=0.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=30, min_efficiency=0.15) wrapped.reset() # Drive in a circle for enough steps to fill the position window rewards = [] for i in range(40): angle = 2 * math.pi * i / 12 # completes circle every 12 steps env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) # After 20+ steps of circular motion, efficiency gate should kick in # Last few rewards should be 0.0 assert rewards[-1] == 0.0, ( f"v6: circular driving should yield 0.0 reward via efficiency gate, got {rewards[-1]:.4f}") assert sum(1 for r in rewards[-5:] if r == 0.0) >= 3, ( f"v6: most of last 5 rewards during circle should be 0.0, got {rewards[-5:]}") def test_forward_driving_earns_positive_reward(): """Straight-line driving at low CTE and reasonable speed earns positive reward.""" env = MockEnv(speed=5.0, cte=0.5) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() _, r, _, _, _ = wrapped.step(0) # reward = (5/10) * (1 - 0.5/8) = 0.5 * 0.9375 = 0.469 assert r > 0.3, f"Forward driving should earn >0.3 reward, got {r:.4f}" def test_forward_beats_circling_by_large_margin(): """ v6: forward driving earns positive reward; circular driving earns zero. The efficiency gate ensures this gap. """ # Forward driving at CTE=1m, speed=5 env_fwd = MockEnv(speed=5.0, cte=1.0) wrapped_fwd = SpeedRewardWrapper(env_fwd, speed_scale=0.1, window_size=30) wrapped_fwd.reset() for i in range(35): env_fwd.set_pos([i * 0.5, 0., 0.]) # straight line _, r_fwd, _, _, _ = wrapped_fwd.step(0) # Circular driving at CTE=0, speed=5 env_circ = MockEnv(speed=5.0, cte=0.0) wrapped_circ = SpeedRewardWrapper(env_circ, speed_scale=0.1, window_size=30) wrapped_circ.reset() for i in range(35): angle = 2 * math.pi * i / 12 env_circ.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) _, r_circ, _, _, _ = wrapped_circ.step(0) assert r_fwd > 0, f"Forward driving should earn positive reward, got {r_fwd}" assert r_circ == 0.0, f"Circular driving should earn 0 reward, got {r_circ}" assert r_fwd > r_circ, f"Forward ({r_fwd:.3f}) must beat circling ({r_circ:.3f})" def test_crash_gives_negative_reward(): """Episode termination (done=True) must always give -1.0.""" env = MockEnv(speed=5.0, cte=0.0, done=True) wrapped = SpeedRewardWrapper(env, speed_scale=0.2) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward == -1.0, f"Crash reward must be -1.0, got {reward}" def test_high_cte_reduces_reward(): """Higher CTE should reduce reward (closer to track edge = lower base).""" env_low = MockEnv(speed=2.0, cte=0.5) env_high = MockEnv(speed=2.0, cte=4.0) wrapped_low = SpeedRewardWrapper(env_low, speed_scale=0.1, window_size=5) wrapped_high = SpeedRewardWrapper(env_high, speed_scale=0.1, window_size=5) wrapped_low.reset() wrapped_high.reset() # Drive straight so efficiency fills up for i in range(10): env_low.set_pos([i * 0.3, 0., 0.]) env_high.set_pos([i * 0.3, 0., 0.]) _, r_low, _, _, _ = wrapped_low.step(0) _, r_high, _, _, _ = wrapped_high.step(0) assert r_low > r_high, f"Low CTE ({r_low:.3f}) should reward more than high CTE ({r_high:.3f})" def test_speed_bonus_increases_reward_when_on_track(): """Faster forward driving earns more reward than slower forward driving.""" env_slow = MockEnv(speed=0.5, cte=1.0) env_fast = MockEnv(speed=3.0, cte=1.0) wrapped_slow = SpeedRewardWrapper(env_slow, speed_scale=0.1, window_size=10) wrapped_fast = SpeedRewardWrapper(env_fast, speed_scale=0.1, window_size=10) wrapped_slow.reset() wrapped_fast.reset() for i in range(15): env_slow.set_pos([i * 0.1, 0., 0.]) env_fast.set_pos([i * 0.3, 0., 0.]) # Fast car covers more ground _, r_slow, _, _, _ = wrapped_slow.step(0) _, r_fast, _, _, _ = wrapped_fast.step(0) assert r_fast > r_slow, f"Fast ({r_fast:.3f}) should earn more than slow ({r_slow:.3f})" def test_theoretical_max_per_step(): """Max reward/step = 1.0 × 1.0 × (1 + scale × max_speed) = 2.0 at scale=0.1, max=10.""" env = MockEnv() wrapped = SpeedRewardWrapper(env, speed_scale=0.1) assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6) def test_4tuple_step_compatibility(): """Wrapper must handle 4-tuple step() return (old gym API).""" env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False) env.set_pos([0., 0., 0.]) wrapped = SpeedRewardWrapper(env, speed_scale=0.1) wrapped.reset() result = wrapped.step(0) assert len(result) == 4, f"Expected 4-tuple, got {len(result)}" _, reward, done, info = result assert isinstance(reward, float) assert reward != 999.0, "Should not use sim reward" def test_reward_resets_on_episode_reset(): """After reset, position history clears so efficiency recalculates cleanly.""" env = MockEnv(speed=2.0, cte=0.5) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() # Fill with circular data for i in range(15): angle = 2 * math.pi * i / 12 env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) wrapped.step(0) # After reset, start fresh straight wrapped.reset() rewards = [] for i in range(5): env.set_pos([i * 0.3, 0., 0.]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) # Should get reasonable reward after fresh start assert rewards[-1] > 0, "Should get positive reward after reset and straight driving" # --------------------------------------------------------------------------- # Short-lap exploit patch tests # --------------------------------------------------------------------------- def test_short_lap_triggers_penalty(): """ A lap completed faster than min_lap_time must return a large penalty, not a positive reward. This closes the start/finish circle exploit. """ env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.)) wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() # Simulate step where a new lap completes in 1 second (exploit) info = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 1.0} reward, _ = wrapper._compute_reward_and_done(done=False, info=info) assert reward < 0, f'Short lap (1s) should penalise, got reward={reward}' assert reward <= -10.0, f'Short lap penalty should be large (<= -10), got {reward}' def test_legitimate_lap_not_penalised(): """ A lap completed above min_lap_time must NOT trigger the penalty. """ env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.)) wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() # First step — no lap yet info_no_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0), 'lap_count': 0, 'last_lap_time': 0.0} wrapper._compute_reward_and_done(done=False, info=info_no_lap) # Legitimate lap at 12 seconds info = {'cte': 0.2, 'speed': 3.0, 'pos': (1.0, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 12.0} reward, _ = wrapper._compute_reward_and_done(done=False, info=info) assert reward >= 0, f'Legitimate lap (12s) should not be penalised, got {reward}' def test_lap_count_not_double_penalised(): """ Penalty fires exactly once per short lap, not on every subsequent step. """ env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.)) wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() # Short lap fires on step where lap_count increments info_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 1.5} r1, _ = wrapper._compute_reward_and_done(done=False, info=info_lap) assert r1 < 0 # Next step same lap_count — should get normal reward, not another penalty info_next = {'cte': 0.0, 'speed': 3.0, 'pos': (0.1, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 1.5} r2, _ = wrapper._compute_reward_and_done(done=False, info=info_next) assert r2 >= 0, f'Penalty should not repeat on same lap_count, got r2={r2}' def test_lap_count_resets_on_episode_reset(): """lap_count tracker must reset when the episode resets.""" env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.)) wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() # Complete a short lap info_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0), 'lap_count': 1, 'last_lap_time': 1.0} wrapper._compute_reward_and_done(done=False, info=info_lap) assert wrapper._last_lap_count == 1 # Reset episode — counter must go back to 0 wrapper.reset() assert wrapper._last_lap_count == 0 # --------------------------------------------------------------------------- # v6.1 exploit terminator tests # --------------------------------------------------------------------------- def test_sustained_high_cte_terminates_episode(): """ Grass exploit fix: if CTE exceeds max_cte_terminate for cte_patience consecutive steps, the episode must be force-terminated with -1.0 reward. This catches the generated_track gap where car drives indefinitely on grass. """ env = MockEnv(speed=3.0, cte=5.0) # CTE=5.0 > max_cte_terminate=4.0 wrapper = SpeedRewardWrapper(env, max_cte_terminate=4.0, cte_patience=5) wrapper.reset() rewards = [] terminated = [] for _ in range(10): info = {'cte': 5.0, 'speed': 3.0, 'pos': (0., 0., 0.), 'active_node': 0, 'lap_count': 0, 'last_lap_time': 0.0} r, force_term = wrapper._compute_reward_and_done(done=False, info=info) rewards.append(r) terminated.append(force_term) # High CTE should be punished immediately, then terminate at step 5 assert rewards[0] < 0, f'High CTE should be negative immediately, got {rewards[0]}' assert terminated[4] == True, f'Should force-terminate at step 5, got {terminated}' assert rewards[4] == -1.0, f'Termination reward should be -1.0, got {rewards[4]}' assert terminated[0] == False, 'Should not terminate at step 1' def test_high_cte_never_gets_positive_speed_reward_before_termination(): """ Regression for generated_road outside-circle exploit: while CTE is outside the allowed corridor, the wrapper must not pay positive speed reward during the patience window. The policy should receive negative feedback immediately, then termination. """ env = MockEnv(speed=5.0, cte=3.0) wrapper = SpeedRewardWrapper(env, max_cte_terminate=2.5, cte_patience=3) wrapper.reset() rewards = [] terminated = [] for i in range(3): info = { 'cte': 3.0, 'speed': 5.0, 'pos': (float(i), 0.0, 0.0), 'active_node': i, 'total_nodes': 100, 'lap_count': 0, 'last_lap_time': 0.0, } r, ft = wrapper._compute_reward_and_done(done=False, info=info) rewards.append(r) terminated.append(ft) assert rewards[:2] == [-0.25, -0.25] assert rewards[2] == -1.0 assert terminated == [False, False, True] def test_high_cte_resets_when_back_on_track(): """ High CTE counter must reset when car returns to track. Prevents false termination after a brief excursion. """ env = MockEnv(speed=3.0, cte=0.5) wrapper = SpeedRewardWrapper(env, max_cte_terminate=4.0, cte_patience=5) wrapper.reset() # 3 steps high CTE for _ in range(3): info = {'cte': 5.0, 'speed': 3.0, 'pos': (0., 0., 0.), 'active_node': 0, 'lap_count': 0, 'last_lap_time': 0.0} r, ft = wrapper._compute_reward_and_done(done=False, info=info) assert ft == False, 'Should not terminate after only 3 steps' # 1 step back on track resets counter info = {'cte': 1.0, 'speed': 3.0, 'pos': (0., 0., 0.), 'active_node': 1, 'lap_count': 0, 'last_lap_time': 0.0} wrapper._compute_reward_and_done(done=False, info=info) assert wrapper._high_cte_steps == 0, 'CTE counter should reset when back on track' # 5 more steps high CTE — should now terminate (counter starts fresh) for i in range(5): info = {'cte': 5.0, 'speed': 3.0, 'pos': (0., 0., 0.), 'active_node': 1, 'lap_count': 0, 'last_lap_time': 0.0} r, ft = wrapper._compute_reward_and_done(done=False, info=info) assert ft == True, 'Should terminate after 5 new consecutive high-CTE steps' def test_no_track_progress_terminates_episode(): """ Circle/stuck exploit fix: if max active_node doesn't advance for progress_patience steps, the episode must be force-terminated. A circling car stays near the same waypoints — max_node never increases. """ env = MockEnv(speed=3.0, cte=0.5) wrapper = SpeedRewardWrapper(env, progress_patience=10) wrapper.reset() # First step initialises max_node to 5, then 10 more steps stuck at 5 → terminate for i in range(12): info = {'cte': 0.5, 'speed': 3.0, 'pos': (float(i)*0.1, 0., 0.), 'active_node': 5, 'total_nodes': 100, 'lap_count': 0, 'last_lap_time': 0.0} r, ft = wrapper._compute_reward_and_done(done=False, info=info) if ft: break assert ft == True, 'Should terminate when max active_node not advancing' assert r == -1.0 def test_low_speed_no_displacement_terminates_barrier_wedge(): """ Regression for invisible-barrier wedge: wheels can be commanded but the car remains nearly motionless with acceptable CTE. This must terminate quickly instead of returning zero/positive reward indefinitely. """ env = MockEnv(speed=0.05, cte=0.5) wrapper = SpeedRewardWrapper( env, low_speed_grace_steps=2, low_speed_patience=3, low_speed_threshold=0.2, low_speed_min_displacement=0.25, progress_patience=100, ) wrapper.reset() terminated = False reward = None for _ in range(8): info = { 'cte': 0.5, 'speed': 0.05, 'pos': (1.0, 0.0, 1.0), 'active_node': 5, 'total_nodes': 100, 'lap_count': 0, 'last_lap_time': 0.0, } reward, terminated = wrapper._compute_reward_and_done(done=False, info=info) if terminated: break assert terminated is True assert reward == -1.0 def test_low_speed_counter_resets_after_meaningful_displacement(): """Slow starts should not terminate if the car is still changing position.""" env = MockEnv(speed=0.05, cte=0.5) wrapper = SpeedRewardWrapper( env, low_speed_grace_steps=0, low_speed_patience=3, low_speed_threshold=0.2, low_speed_min_displacement=0.25, progress_patience=100, ) wrapper.reset() for i in range(6): info = { 'cte': 0.5, 'speed': 0.05, 'pos': (float(i) * 0.3, 0.0, 0.0), 'active_node': i, 'total_nodes': 100, 'lap_count': 0, 'last_lap_time': 0.0, } reward, terminated = wrapper._compute_reward_and_done(done=False, info=info) assert terminated is False def test_track_progress_resets_counter(): """ Advancing to a new max active_node must reset the no-progress counter. """ env = MockEnv(speed=3.0, cte=0.5) wrapper = SpeedRewardWrapper(env, progress_patience=5) wrapper.reset() # Step forward: nodes 0, 1, 2, 3 — each new node resets counter for node in range(4): info = {'cte': 0.5, 'speed': 3.0, 'pos': (float(node)*0.5, 0., 0.), 'active_node': node, 'total_nodes': 100, 'lap_count': 0, 'last_lap_time': 0.0} r, ft = wrapper._compute_reward_and_done(done=False, info=info) assert ft == False, f'Should not terminate when advancing (node {node})' assert wrapper._no_progress_steps == 0, 'Counter should reset on new max node' def test_circle_exploit_terminates(): """ A car circling near the same spot should be terminated. active_node oscillates but never exceeds the initial max. """ env = MockEnv(speed=3.0, cte=0.5) wrapper = SpeedRewardWrapper(env, progress_patience=10) wrapper.reset() # Set max_node to 10 info = {'cte': 0.5, 'speed': 3.0, 'pos': (1., 0., 0.), 'active_node': 10, 'total_nodes': 100, 'lap_count': 0, 'last_lap_time': 0.0} wrapper._compute_reward_and_done(done=False, info=info) # Now oscillate between nodes 8-10 (circling near node 10) terminated = False for i in range(20): node = 8 + (i % 3) # oscillates 8, 9, 10, 8, 9, 10... info = {'cte': 0.5, 'speed': 3.0, 'pos': (1., 0., 0.), 'active_node': node, 'total_nodes': 100, 'lap_count': 0, 'last_lap_time': 0.0} r, ft = wrapper._compute_reward_and_done(done=False, info=info) if ft: terminated = True break assert terminated, 'Circling (oscillating active_node, no new max) should terminate' def test_lap_completion_resets_progress_tracker(): """ On lap completion, active_node resets to 0. Progress tracker must also reset so the car isn't immediately terminated for 'no progress'. """ env = MockEnv(speed=3.0, cte=0.5) wrapper = SpeedRewardWrapper(env, progress_patience=5, min_lap_time=5.0) wrapper.reset() # Drive to near end of track info = {'cte': 0.5, 'speed': 3.0, 'pos': (1., 0., 0.), 'active_node': 99, 'total_nodes': 100, 'lap_count': 0, 'last_lap_time': 0.0} wrapper._compute_reward_and_done(done=False, info=info) assert wrapper._max_node_seen == 99 # Complete a valid lap info = {'cte': 0.5, 'speed': 3.0, 'pos': (0., 0., 0.), 'active_node': 0, 'total_nodes': 100, 'lap_count': 1, 'last_lap_time': 12.0} # 12s lap = valid r, ft = wrapper._compute_reward_and_done(done=False, info=info) # Progress tracker should be reset assert wrapper._max_node_seen == -1, 'max_node_seen should reset on lap completion' assert wrapper._no_progress_steps == 0 assert ft == False, 'Valid lap should not terminate'