"""Tests for reward_wrapper.py v7 (clean: speed×CTE + efficiency gate).""" import sys, os, math, pytest import numpy as np sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent')) from reward_wrapper import SpeedRewardWrapper import gymnasium as gym class MockEnv(gym.Env): metadata = {'render_modes': []} def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True): super().__init__() self.action_space = gym.spaces.Discrete(5) self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8) self._speed = speed self._cte = cte self._pos = list(pos) self._done = done self._use_5tuple = use_5tuple def set_pos(self, p): self._pos = list(p) def set_cte(self, c): self._cte = c def reset(self, seed=None, **kwargs): return np.zeros((120, 160, 3), dtype=np.uint8), {} def step(self, action): obs = np.zeros((120, 160, 3), dtype=np.uint8) sim_reward = 999.0 # deliberately bogus — wrapper must ignore this info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos} if self._use_5tuple: return obs, sim_reward, self._done, False, info return obs, sim_reward, self._done, info # ── Helpers ────────────────────────────────────────────────────────────────── def make_info(cte=0.5, speed=2.0, pos=None, active_node=1, lap_count=0, lap_time=0.0): return { 'cte': cte, 'speed': speed, 'pos': pos or (0., 0., 0.), 'active_node': active_node, 'total_nodes': 100, 'lap_count': lap_count, 'last_lap_time': lap_time, } # ── Core reward properties ──────────────────────────────────────────────────── def test_sim_reward_is_completely_ignored(): env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.)) wrapped = SpeedRewardWrapper(env) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward != 999.0 assert reward < 10.0 def test_crash_gives_negative_one(): env = MockEnv(speed=5.0, cte=0.0, done=True) wrapped = SpeedRewardWrapper(env) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward == -1.0 def test_forward_driving_earns_positive_reward(): env = MockEnv(speed=5.0, cte=0.5) wrapped = SpeedRewardWrapper(env, window_size=10) wrapped.reset() _, r, _, _, _ = wrapped.step(0) # reward = (5/10) * (1 - 0.5/8.0) = 0.5 * 0.9375 = 0.469 assert r > 0.3, f"Forward driving should earn >0.3, got {r:.4f}" def test_higher_cte_reduces_reward(): env_low = MockEnv(speed=2.0, cte=0.5) env_high = MockEnv(speed=2.0, cte=4.0) w_low = SpeedRewardWrapper(env_low, window_size=5) w_high = SpeedRewardWrapper(env_high, window_size=5) w_low.reset(); w_high.reset() for i in range(10): env_low.set_pos( [i * 0.3, 0., 0.]) env_high.set_pos([i * 0.3, 0., 0.]) _, r_low, _, _, _ = w_low.step(0) _, r_high, _, _, _ = w_high.step(0) assert r_low > r_high def test_higher_speed_increases_reward(): env_slow = MockEnv(speed=0.5, cte=1.0) env_fast = MockEnv(speed=3.0, cte=1.0) w_slow = SpeedRewardWrapper(env_slow, window_size=10) w_fast = SpeedRewardWrapper(env_fast, window_size=10) w_slow.reset(); w_fast.reset() for i in range(15): env_slow.set_pos([i * 0.1, 0., 0.]) env_fast.set_pos([i * 0.3, 0., 0.]) _, r_slow, _, _, _ = w_slow.step(0) _, r_fast, _, _, _ = w_fast.step(0) assert r_fast > r_slow def test_4tuple_compatibility(): env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False) env.set_pos([0., 0., 0.]) wrapped = SpeedRewardWrapper(env) wrapped.reset() result = wrapped.step(0) assert len(result) == 4 _, reward, done, info = result assert isinstance(reward, float) assert reward != 999.0 # ── Efficiency gate ─────────────────────────────────────────────────────────── def test_circling_earns_zero_reward(): env = MockEnv(speed=3.0, cte=0.0) wrapped = SpeedRewardWrapper(env, window_size=30, min_efficiency=0.15) wrapped.reset() rewards = [] for i in range(40): angle = 2 * math.pi * i / 12 env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) assert rewards[-1] == 0.0 assert sum(1 for r in rewards[-5:] if r == 0.0) >= 3 def test_forward_beats_circling(): env_fwd = MockEnv(speed=5.0, cte=1.0) w_fwd = SpeedRewardWrapper(env_fwd, window_size=30) w_fwd.reset() for i in range(35): env_fwd.set_pos([i * 0.5, 0., 0.]) _, r_fwd, _, _, _ = w_fwd.step(0) env_circ = MockEnv(speed=5.0, cte=0.0) w_circ = SpeedRewardWrapper(env_circ, window_size=30) w_circ.reset() for i in range(35): angle = 2 * math.pi * i / 12 env_circ.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) _, r_circ, _, _, _ = w_circ.step(0) assert r_fwd > 0 assert r_circ == 0.0 def test_history_clears_on_reset(): env = MockEnv(speed=2.0, cte=0.5) wrapped = SpeedRewardWrapper(env, window_size=10) wrapped.reset() for i in range(15): angle = 2 * math.pi * i / 12 env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) wrapped.step(0) wrapped.reset() rewards = [] for i in range(5): env.set_pos([i * 0.3, 0., 0.]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) assert rewards[-1] > 0 # ── No-progress termination ─────────────────────────────────────────────────── def test_no_progress_terminates(): env = MockEnv(speed=3.0, cte=0.5) wrapper = SpeedRewardWrapper(env, progress_patience=10) wrapper.reset() for i in range(12): r, ft = wrapper._compute_reward(False, make_info(active_node=5, pos=(i*0.1, 0., 0.))) if ft: break assert ft is True assert r == -1.0 def test_progress_resets_counter(): env = MockEnv() wrapper = SpeedRewardWrapper(env, progress_patience=5) wrapper.reset() for node in range(4): r, ft = wrapper._compute_reward(False, make_info(active_node=node, pos=(node*0.5, 0., 0.))) assert ft is False assert wrapper._no_progress_steps == 0 def test_circling_active_node_terminates(): env = MockEnv() wrapper = SpeedRewardWrapper(env, progress_patience=10) wrapper.reset() wrapper._compute_reward(False, make_info(active_node=10)) terminated = False for i in range(20): r, ft = wrapper._compute_reward(False, make_info(active_node=8 + (i % 3))) if ft: terminated = True break assert terminated def test_lap_completion_resets_progress_tracker(): env = MockEnv() wrapper = SpeedRewardWrapper(env, progress_patience=5, min_lap_time=5.0) wrapper.reset() wrapper._compute_reward(False, make_info(active_node=99)) assert wrapper._max_node_seen == 99 r, ft = wrapper._compute_reward(False, make_info(active_node=0, lap_count=1, lap_time=12.0)) assert wrapper._max_node_seen == -1 assert wrapper._no_progress_steps == 0 assert ft is False # ── Lap exploit guard ───────────────────────────────────────────────────────── def test_short_lap_penalised(): env = MockEnv() wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() r, _ = wrapper._compute_reward(False, make_info(lap_count=1, lap_time=1.0)) assert r < 0 assert r <= -10.0 def test_legitimate_lap_not_penalised(): env = MockEnv() wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() wrapper._compute_reward(False, make_info(lap_count=0)) r, _ = wrapper._compute_reward(False, make_info(lap_count=1, lap_time=12.0, pos=(1., 0., 0.))) assert r >= 0 def test_lap_penalty_fires_once(): env = MockEnv() wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() r1, _ = wrapper._compute_reward(False, make_info(lap_count=1, lap_time=1.5)) assert r1 < 0 r2, _ = wrapper._compute_reward(False, make_info(lap_count=1, lap_time=1.5, pos=(0.1, 0., 0.))) assert r2 >= 0 def test_lap_count_resets_on_episode_reset(): env = MockEnv() wrapper = SpeedRewardWrapper(env, min_lap_time=5.0) wrapper.reset() wrapper._compute_reward(False, make_info(lap_count=1, lap_time=1.0)) assert wrapper._last_lap_count == 1 wrapper.reset() assert wrapper._last_lap_count == 0