""" Tests for reward_wrapper.py v4 (full sim bypass — base × efficiency × speed). """ import sys, os, math, pytest import numpy as np import gymnasium as gym from collections import deque sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent')) from reward_wrapper import SpeedRewardWrapper # ---- Mock Environments ---- class MockEnv(gym.Env): """Configurable mock gymnasium.Env.""" metadata = {'render_modes': []} def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True): super().__init__() self.action_space = gym.spaces.Discrete(5) self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8) self._speed = speed self._cte = cte self._pos = list(pos) self._done = done self._use_5tuple = use_5tuple def set_pos(self, p): self._pos = list(p) def set_cte(self, c): self._cte = c def reset(self, seed=None, **kwargs): return np.zeros((120, 160, 3), dtype=np.uint8), {} def step(self, action): obs = np.zeros((120, 160, 3), dtype=np.uint8) # Sim reward uses forward_vel (exploitable) — wrapper should IGNORE this sim_reward = 999.0 # Deliberately bogus — wrapper must not use this info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos} if self._use_5tuple: return obs, sim_reward, self._done, False, info return obs, sim_reward, self._done, info def close(self): pass def step_wrapped(wrapped_env, env, pos, cte=0.5, speed=2.0): env.set_pos(pos) env.set_cte(cte) env._speed = speed return wrapped_env.step(0) # ---- Core v4 Properties ---- def test_sim_reward_is_completely_ignored(): """ The wrapper must NOT use the sim's reward (999.0). v4 computes reward from scratch using CTE/pos/speed only. """ env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.)) wrapped = SpeedRewardWrapper(env, speed_scale=0.1) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward != 999.0, "Wrapper must not pass through sim's bogus reward" assert reward < 10.0, f"Reward should be small, got {reward}" def test_circling_at_zero_cte_gives_near_zero_reward(): """ CORE v4 GUARANTEE: A spinning car at CTE=0 must earn near-zero reward. v3 failed this: spinning at CTE=0 gave 1.0/step regardless of efficiency. v4 multiplies base reward by efficiency → circling yields ≈ 0. """ env = MockEnv(speed=3.0, cte=0.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=20) wrapped.reset() # Simulate full circles (returns to start position) radius = 0.5 rewards = [] for i in range(30): angle = 2 * math.pi * (i % 20) / 20 env.set_pos([radius * math.cos(angle), 0., radius * math.sin(angle)]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) # After window fills, rewards should be near zero (circling detected) late_rewards = rewards[20:] avg = sum(late_rewards) / len(late_rewards) assert avg < 0.15, f"Circling at CTE=0 should earn near-zero reward, got avg={avg:.4f}" def test_forward_driving_earns_positive_reward(): """Straight-line driving at low CTE earns a clear positive reward.""" env = MockEnv(speed=2.0, cte=0.5) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() rewards = [] for i in range(20): env.set_pos([i * 0.3, 0., 0.]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) late = rewards[10:] avg = sum(late) / len(late) assert avg > 0.5, f"Forward driving should earn >0.5 reward, got {avg:.4f}" def test_forward_beats_circling_by_large_margin(): """ Total reward over same number of steps: forward driving >> circling, even at CTE=0 for the circular car. """ env_fwd = MockEnv(speed=2.0, cte=0.5) env_circ = MockEnv(speed=2.0, cte=0.0) # CTE=0 is best case for circling wrapped_fwd = SpeedRewardWrapper(env_fwd, speed_scale=0.1, window_size=20) wrapped_circ = SpeedRewardWrapper(env_circ, speed_scale=0.1, window_size=20) wrapped_fwd.reset() wrapped_circ.reset() total_fwd, total_circ = 0.0, 0.0 radius = 0.5 for i in range(40): # Forward: moves in straight line env_fwd.set_pos([i * 0.3, 0., 0.]) _, r, _, _, _ = wrapped_fwd.step(0) total_fwd += r # Circular: perfect circles at CTE=0 angle = 2 * math.pi * (i % 20) / 20 env_circ.set_pos([radius * math.cos(angle), 0., radius * math.sin(angle)]) _, r, _, _, _ = wrapped_circ.step(0) total_circ += r assert total_fwd > total_circ * 3, ( f"Forward ({total_fwd:.1f}) should beat circling ({total_circ:.1f}) by 3x" ) def test_crash_gives_negative_reward(): """Episode termination (done=True) must always give -1.0.""" env = MockEnv(speed=5.0, cte=0.0, done=True) wrapped = SpeedRewardWrapper(env, speed_scale=0.2) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward == -1.0, f"Crash reward must be -1.0, got {reward}" def test_high_cte_reduces_reward(): """Higher CTE should reduce reward (closer to track edge = lower base).""" env_low = MockEnv(speed=2.0, cte=0.5) env_high = MockEnv(speed=2.0, cte=4.0) wrapped_low = SpeedRewardWrapper(env_low, speed_scale=0.1, window_size=5) wrapped_high = SpeedRewardWrapper(env_high, speed_scale=0.1, window_size=5) wrapped_low.reset() wrapped_high.reset() # Drive straight so efficiency fills up for i in range(10): env_low.set_pos([i * 0.3, 0., 0.]) env_high.set_pos([i * 0.3, 0., 0.]) _, r_low, _, _, _ = wrapped_low.step(0) _, r_high, _, _, _ = wrapped_high.step(0) assert r_low > r_high, f"Low CTE ({r_low:.3f}) should reward more than high CTE ({r_high:.3f})" def test_speed_bonus_increases_reward_when_on_track(): """Faster forward driving earns more reward than slower forward driving.""" env_slow = MockEnv(speed=0.5, cte=1.0) env_fast = MockEnv(speed=3.0, cte=1.0) wrapped_slow = SpeedRewardWrapper(env_slow, speed_scale=0.1, window_size=10) wrapped_fast = SpeedRewardWrapper(env_fast, speed_scale=0.1, window_size=10) wrapped_slow.reset() wrapped_fast.reset() for i in range(15): env_slow.set_pos([i * 0.1, 0., 0.]) env_fast.set_pos([i * 0.3, 0., 0.]) # Fast car covers more ground _, r_slow, _, _, _ = wrapped_slow.step(0) _, r_fast, _, _, _ = wrapped_fast.step(0) assert r_fast > r_slow, f"Fast ({r_fast:.3f}) should earn more than slow ({r_slow:.3f})" def test_theoretical_max_per_step(): """Max reward/step = 1.0 × 1.0 × (1 + scale × max_speed) = 2.0 at scale=0.1, max=10.""" env = MockEnv() wrapped = SpeedRewardWrapper(env, speed_scale=0.1) assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6) def test_4tuple_step_compatibility(): """Wrapper must handle 4-tuple step() return (old gym API).""" env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False) env.set_pos([0., 0., 0.]) wrapped = SpeedRewardWrapper(env, speed_scale=0.1) wrapped.reset() result = wrapped.step(0) assert len(result) == 4, f"Expected 4-tuple, got {len(result)}" _, reward, done, info = result assert isinstance(reward, float) assert reward != 999.0, "Should not use sim reward" def test_reward_resets_on_episode_reset(): """After reset, position history clears so efficiency recalculates cleanly.""" env = MockEnv(speed=2.0, cte=0.5) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() # Fill with circular data for i in range(15): angle = 2 * math.pi * i / 12 env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) wrapped.step(0) # After reset, start fresh straight wrapped.reset() rewards = [] for i in range(5): env.set_pos([i * 0.3, 0., 0.]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) # Should get reasonable reward after fresh start assert rewards[-1] > 0, "Should get positive reward after reset and straight driving"