241 lines
8.8 KiB
Python
241 lines
8.8 KiB
Python
"""
|
||
Tests for reward_wrapper.py v3 (path efficiency / anti-circular) — no simulator required.
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import math
|
||
import pytest
|
||
import numpy as np
|
||
import gymnasium as gym
|
||
from collections import deque
|
||
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
|
||
|
||
from reward_wrapper import SpeedRewardWrapper
|
||
|
||
|
||
def make_env_with_pos(speed=2.0, original_reward=1.0, done=False, pos=(0.0, 0.0, 0.0)):
|
||
"""Create a mock env that returns a specific position in info dict."""
|
||
class PosEnv(gym.Env):
|
||
metadata = {'render_modes': []}
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.action_space = gym.spaces.Discrete(5)
|
||
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
|
||
self._pos = list(pos)
|
||
self._speed = speed
|
||
self._reward = original_reward
|
||
self._done = done
|
||
|
||
def set_pos(self, p):
|
||
self._pos = list(p)
|
||
|
||
def reset(self, seed=None, **kwargs):
|
||
return np.zeros((120, 160, 3), dtype=np.uint8), {}
|
||
|
||
def step(self, action):
|
||
obs = np.zeros((120, 160, 3), dtype=np.uint8)
|
||
info = {'speed': self._speed, 'pos': self._pos}
|
||
return obs, self._reward, self._done, False, info
|
||
|
||
def close(self):
|
||
pass
|
||
|
||
return PosEnv()
|
||
|
||
|
||
# ---- Core Anti-Hacking Tests (inherited from v2) ----
|
||
|
||
def test_no_speed_bonus_when_off_track():
|
||
"""Off-track reward (≤ 0) must NOT get a speed bonus regardless of efficiency."""
|
||
env = make_env_with_pos(speed=10.0, original_reward=-1.0)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.5)
|
||
wrapped.reset()
|
||
_, reward, _, _, _ = wrapped.step(0)
|
||
assert reward == -1.0, f"Off-track reward must not get bonus, got {reward}"
|
||
|
||
|
||
def test_no_speed_bonus_when_reward_zero():
|
||
"""Reward exactly 0 should not get speed bonus."""
|
||
env = make_env_with_pos(speed=5.0, original_reward=0.0)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.5)
|
||
wrapped.reset()
|
||
_, reward, _, _, _ = wrapped.step(0)
|
||
assert reward == 0.0, f"Zero reward should stay zero, got {reward}"
|
||
|
||
|
||
# ---- Path Efficiency Tests ----
|
||
|
||
def _simulate_straight_driving(wrapped_env, env, steps=40, speed=3.0, step_size=0.1):
|
||
"""Simulate straight-line driving: car moves forward by step_size each step."""
|
||
wrapped_env.reset()
|
||
rewards = []
|
||
for i in range(steps):
|
||
env.set_pos([i * step_size, 0.0, 0.0])
|
||
env._speed = speed
|
||
_, r, _, _, _ = wrapped_env.step(0)
|
||
rewards.append(r)
|
||
return rewards
|
||
|
||
|
||
def _simulate_circular_driving(wrapped_env, env, steps=40, speed=3.0, radius=0.5):
|
||
"""Simulate circular driving: car moves in a circle, returns to start."""
|
||
wrapped_env.reset()
|
||
rewards = []
|
||
for i in range(steps):
|
||
angle = 2 * math.pi * i / steps
|
||
x = radius * math.cos(angle)
|
||
z = radius * math.sin(angle)
|
||
env.set_pos([x, 0.0, z])
|
||
env._speed = speed
|
||
_, r, _, _, _ = wrapped_env.step(0)
|
||
rewards.append(r)
|
||
return rewards
|
||
|
||
|
||
def test_straight_driving_gets_higher_reward_than_circular():
|
||
"""
|
||
CRITICAL: Straight driving must produce more total reward than circular driving
|
||
at the same speed and base reward. This is the core anti-circular guarantee.
|
||
"""
|
||
env_straight = make_env_with_pos(speed=3.0, original_reward=0.8)
|
||
env_circular = make_env_with_pos(speed=3.0, original_reward=0.8)
|
||
|
||
wrapped_straight = SpeedRewardWrapper(env_straight, speed_scale=0.1, window_size=20)
|
||
wrapped_circular = SpeedRewardWrapper(env_circular, speed_scale=0.1, window_size=20)
|
||
|
||
straight_rewards = _simulate_straight_driving(wrapped_straight, env_straight, steps=40)
|
||
circular_rewards = _simulate_circular_driving(wrapped_circular, env_circular, steps=40)
|
||
|
||
# After warmup (window fills), straight should consistently beat circular
|
||
straight_tail = sum(straight_rewards[20:])
|
||
circular_tail = sum(circular_rewards[20:])
|
||
|
||
assert straight_tail > circular_tail, (
|
||
f"Straight driving ({straight_tail:.2f}) should beat circular ({circular_tail:.2f})"
|
||
)
|
||
|
||
|
||
def test_efficiency_near_one_for_straight_driving():
|
||
"""Path efficiency should be near 1.0 for straight-line motion."""
|
||
env = make_env_with_pos(speed=3.0, original_reward=1.0)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
|
||
wrapped.reset()
|
||
|
||
# Drive in a straight line
|
||
for i in range(15):
|
||
env.set_pos([i * 0.2, 0.0, 0.0])
|
||
wrapped.step(0)
|
||
|
||
efficiency = wrapped._compute_efficiency()
|
||
assert efficiency > 0.90, f"Straight driving efficiency should be >0.90, got {efficiency:.4f}"
|
||
|
||
|
||
def test_efficiency_near_zero_for_circular_driving():
|
||
"""Path efficiency should be near 0.0 for full circular motion."""
|
||
env = make_env_with_pos(speed=3.0, original_reward=1.0)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=20)
|
||
wrapped.reset()
|
||
|
||
# Drive a full circle (returns to start position)
|
||
radius = 1.0
|
||
steps = 25 # More than window_size to fill it
|
||
for i in range(steps):
|
||
angle = 2 * math.pi * i / 24 # 24 steps = full circle
|
||
x = radius * math.cos(angle)
|
||
z = radius * math.sin(angle)
|
||
env.set_pos([x, 0.0, z])
|
||
wrapped.step(0)
|
||
|
||
efficiency = wrapped._compute_efficiency()
|
||
assert efficiency < 0.2, f"Circular driving efficiency should be <0.2, got {efficiency:.4f}"
|
||
|
||
|
||
def test_efficiency_one_with_no_pos_history():
|
||
"""When position not available, efficiency should default to 1.0 (no penalty)."""
|
||
class NoPosEnv(gym.Env):
|
||
metadata = {'render_modes': []}
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.action_space = gym.spaces.Discrete(5)
|
||
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
|
||
def reset(self, seed=None, **kwargs):
|
||
return np.zeros((120, 160, 3), dtype=np.uint8), {}
|
||
def step(self, action):
|
||
return np.zeros((120, 160, 3), dtype=np.uint8), 0.8, False, False, {'speed': 2.0} # No pos
|
||
def close(self):
|
||
pass
|
||
|
||
wrapped = SpeedRewardWrapper(NoPosEnv(), speed_scale=0.1)
|
||
wrapped.reset()
|
||
_, reward, _, _, _ = wrapped.step(0)
|
||
# Without pos, efficiency=1.0, so reward = 0.8 * (1 + 0.1*2*1.0) = 0.96
|
||
assert reward > 0.8, f"Without pos, should get speed bonus (efficiency=1.0), got {reward}"
|
||
|
||
|
||
def test_efficiency_resets_on_episode_reset():
|
||
"""Position history should clear on reset, so each episode starts fresh."""
|
||
env = make_env_with_pos(speed=3.0, original_reward=1.0)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
|
||
wrapped.reset()
|
||
|
||
# Fill with circular data
|
||
radius = 0.5
|
||
for i in range(15):
|
||
angle = 2 * math.pi * i / 12
|
||
env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)])
|
||
wrapped.step(0)
|
||
|
||
eff_before_reset = wrapped._compute_efficiency()
|
||
|
||
# Reset and drive straight for a few steps
|
||
wrapped.reset()
|
||
for i in range(3):
|
||
env.set_pos([i * 0.3, 0.0, 0.0])
|
||
wrapped.step(0)
|
||
|
||
eff_after_reset = wrapped._compute_efficiency()
|
||
assert eff_after_reset > eff_before_reset, \
|
||
f"After reset, efficiency should improve: before={eff_before_reset:.3f}, after={eff_after_reset:.3f}"
|
||
|
||
|
||
def test_speed_bonus_disappears_when_circling():
|
||
"""After circling for window_size steps, speed bonus should be nearly zero."""
|
||
env = make_env_with_pos(speed=5.0, original_reward=1.0)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.5, window_size=20, min_efficiency=0.05)
|
||
wrapped.reset()
|
||
|
||
# Warm up with circular motion
|
||
radius = 0.5
|
||
rewards = []
|
||
for i in range(30):
|
||
angle = 2 * math.pi * (i % 20) / 20 # Full circle every 20 steps
|
||
env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)])
|
||
_, r, _, _, _ = wrapped.step(0)
|
||
rewards.append(r)
|
||
|
||
# Later rewards (after window fills) should be close to original_reward
|
||
later_rewards = rewards[20:]
|
||
avg_later = sum(later_rewards) / len(later_rewards)
|
||
assert avg_later < 1.3, \
|
||
f"Circular driving speed bonus should be suppressed, avg reward={avg_later:.3f} (original=1.0)"
|
||
|
||
|
||
# ---- Inherited guarantees ----
|
||
|
||
def test_crash_still_penalized():
|
||
"""Crash (original_reward=-1) should remain -1 regardless of speed or efficiency."""
|
||
env = make_env_with_pos(speed=8.0, original_reward=-1.0, done=True)
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.2)
|
||
wrapped.reset()
|
||
_, reward, _, _, _ = wrapped.step(0)
|
||
assert reward == -1.0, f"Crash reward should remain -1.0, got {reward}"
|
||
|
||
|
||
def test_theoretical_max_per_step():
|
||
"""Max reward/step bounded: original(1.0) × (1 + speed_scale × max_speed)."""
|
||
env = make_env_with_pos()
|
||
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
|
||
assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6)
|