""" Tests for reward_wrapper.py v3 (path efficiency / anti-circular) — no simulator required. """ import sys import os import math import pytest import numpy as np import gymnasium as gym from collections import deque sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent')) from reward_wrapper import SpeedRewardWrapper def make_env_with_pos(speed=2.0, original_reward=1.0, done=False, pos=(0.0, 0.0, 0.0)): """Create a mock env that returns a specific position in info dict.""" class PosEnv(gym.Env): metadata = {'render_modes': []} def __init__(self): super().__init__() self.action_space = gym.spaces.Discrete(5) self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8) self._pos = list(pos) self._speed = speed self._reward = original_reward self._done = done def set_pos(self, p): self._pos = list(p) def reset(self, seed=None, **kwargs): return np.zeros((120, 160, 3), dtype=np.uint8), {} def step(self, action): obs = np.zeros((120, 160, 3), dtype=np.uint8) info = {'speed': self._speed, 'pos': self._pos} return obs, self._reward, self._done, False, info def close(self): pass return PosEnv() # ---- Core Anti-Hacking Tests (inherited from v2) ---- def test_no_speed_bonus_when_off_track(): """Off-track reward (≤ 0) must NOT get a speed bonus regardless of efficiency.""" env = make_env_with_pos(speed=10.0, original_reward=-1.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.5) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward == -1.0, f"Off-track reward must not get bonus, got {reward}" def test_no_speed_bonus_when_reward_zero(): """Reward exactly 0 should not get speed bonus.""" env = make_env_with_pos(speed=5.0, original_reward=0.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.5) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward == 0.0, f"Zero reward should stay zero, got {reward}" # ---- Path Efficiency Tests ---- def _simulate_straight_driving(wrapped_env, env, steps=40, speed=3.0, step_size=0.1): """Simulate straight-line driving: car moves forward by step_size each step.""" wrapped_env.reset() rewards = [] for i in range(steps): env.set_pos([i * step_size, 0.0, 0.0]) env._speed = speed _, r, _, _, _ = wrapped_env.step(0) rewards.append(r) return rewards def _simulate_circular_driving(wrapped_env, env, steps=40, speed=3.0, radius=0.5): """Simulate circular driving: car moves in a circle, returns to start.""" wrapped_env.reset() rewards = [] for i in range(steps): angle = 2 * math.pi * i / steps x = radius * math.cos(angle) z = radius * math.sin(angle) env.set_pos([x, 0.0, z]) env._speed = speed _, r, _, _, _ = wrapped_env.step(0) rewards.append(r) return rewards def test_straight_driving_gets_higher_reward_than_circular(): """ CRITICAL: Straight driving must produce more total reward than circular driving at the same speed and base reward. This is the core anti-circular guarantee. """ env_straight = make_env_with_pos(speed=3.0, original_reward=0.8) env_circular = make_env_with_pos(speed=3.0, original_reward=0.8) wrapped_straight = SpeedRewardWrapper(env_straight, speed_scale=0.1, window_size=20) wrapped_circular = SpeedRewardWrapper(env_circular, speed_scale=0.1, window_size=20) straight_rewards = _simulate_straight_driving(wrapped_straight, env_straight, steps=40) circular_rewards = _simulate_circular_driving(wrapped_circular, env_circular, steps=40) # After warmup (window fills), straight should consistently beat circular straight_tail = sum(straight_rewards[20:]) circular_tail = sum(circular_rewards[20:]) assert straight_tail > circular_tail, ( f"Straight driving ({straight_tail:.2f}) should beat circular ({circular_tail:.2f})" ) def test_efficiency_near_one_for_straight_driving(): """Path efficiency should be near 1.0 for straight-line motion.""" env = make_env_with_pos(speed=3.0, original_reward=1.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() # Drive in a straight line for i in range(15): env.set_pos([i * 0.2, 0.0, 0.0]) wrapped.step(0) efficiency = wrapped._compute_efficiency() assert efficiency > 0.90, f"Straight driving efficiency should be >0.90, got {efficiency:.4f}" def test_efficiency_near_zero_for_circular_driving(): """Path efficiency should be near 0.0 for full circular motion.""" env = make_env_with_pos(speed=3.0, original_reward=1.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=20) wrapped.reset() # Drive a full circle (returns to start position) radius = 1.0 steps = 25 # More than window_size to fill it for i in range(steps): angle = 2 * math.pi * i / 24 # 24 steps = full circle x = radius * math.cos(angle) z = radius * math.sin(angle) env.set_pos([x, 0.0, z]) wrapped.step(0) efficiency = wrapped._compute_efficiency() assert efficiency < 0.2, f"Circular driving efficiency should be <0.2, got {efficiency:.4f}" def test_efficiency_one_with_no_pos_history(): """When position not available, efficiency should default to 1.0 (no penalty).""" class NoPosEnv(gym.Env): metadata = {'render_modes': []} def __init__(self): super().__init__() self.action_space = gym.spaces.Discrete(5) self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8) def reset(self, seed=None, **kwargs): return np.zeros((120, 160, 3), dtype=np.uint8), {} def step(self, action): return np.zeros((120, 160, 3), dtype=np.uint8), 0.8, False, False, {'speed': 2.0} # No pos def close(self): pass wrapped = SpeedRewardWrapper(NoPosEnv(), speed_scale=0.1) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) # Without pos, efficiency=1.0, so reward = 0.8 * (1 + 0.1*2*1.0) = 0.96 assert reward > 0.8, f"Without pos, should get speed bonus (efficiency=1.0), got {reward}" def test_efficiency_resets_on_episode_reset(): """Position history should clear on reset, so each episode starts fresh.""" env = make_env_with_pos(speed=3.0, original_reward=1.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() # Fill with circular data radius = 0.5 for i in range(15): angle = 2 * math.pi * i / 12 env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)]) wrapped.step(0) eff_before_reset = wrapped._compute_efficiency() # Reset and drive straight for a few steps wrapped.reset() for i in range(3): env.set_pos([i * 0.3, 0.0, 0.0]) wrapped.step(0) eff_after_reset = wrapped._compute_efficiency() assert eff_after_reset > eff_before_reset, \ f"After reset, efficiency should improve: before={eff_before_reset:.3f}, after={eff_after_reset:.3f}" def test_speed_bonus_disappears_when_circling(): """After circling for window_size steps, speed bonus should be nearly zero.""" env = make_env_with_pos(speed=5.0, original_reward=1.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.5, window_size=20, min_efficiency=0.05) wrapped.reset() # Warm up with circular motion radius = 0.5 rewards = [] for i in range(30): angle = 2 * math.pi * (i % 20) / 20 # Full circle every 20 steps env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) # Later rewards (after window fills) should be close to original_reward later_rewards = rewards[20:] avg_later = sum(later_rewards) / len(later_rewards) assert avg_later < 1.3, \ f"Circular driving speed bonus should be suppressed, avg reward={avg_later:.3f} (original=1.0)" # ---- Inherited guarantees ---- def test_crash_still_penalized(): """Crash (original_reward=-1) should remain -1 regardless of speed or efficiency.""" env = make_env_with_pos(speed=8.0, original_reward=-1.0, done=True) wrapped = SpeedRewardWrapper(env, speed_scale=0.2) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) assert reward == -1.0, f"Crash reward should remain -1.0, got {reward}" def test_theoretical_max_per_step(): """Max reward/step bounded: original(1.0) × (1 + speed_scale × max_speed).""" env = make_env_with_pos() wrapped = SpeedRewardWrapper(env, speed_scale=0.1) assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6)