139 lines
5.1 KiB
Python
139 lines
5.1 KiB
Python
"""
|
|
Tests for reward_wrapper.py — no simulator required.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import pytest
|
|
import numpy as np
|
|
import gymnasium as gym
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
|
|
|
|
from reward_wrapper import SpeedRewardWrapper
|
|
|
|
|
|
class MockStepEnv(gym.Env):
|
|
"""Mock gymnasium.Env for testing SpeedRewardWrapper."""
|
|
metadata = {'render_modes': []}
|
|
|
|
def __init__(self, speed=2.0, cte=0.5, original_reward=1.0, done=False, use_5tuple=True):
|
|
super().__init__()
|
|
self._speed = speed
|
|
self._cte = cte
|
|
self._reward = original_reward
|
|
self._done = done
|
|
self._use_5tuple = use_5tuple
|
|
self.action_space = gym.spaces.Discrete(5)
|
|
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
|
|
|
|
def reset(self, seed=None, **kwargs):
|
|
return np.zeros((120, 160, 3), dtype=np.uint8), {}
|
|
|
|
def step(self, action):
|
|
obs = np.zeros((120, 160, 3), dtype=np.uint8)
|
|
info = {'speed': self._speed, 'cte': self._cte}
|
|
if self._use_5tuple:
|
|
return obs, self._reward, self._done, False, info
|
|
else:
|
|
return obs, self._reward, self._done, info
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
def test_speed_reward_higher_when_fast_and_centered():
|
|
"""Reward should be higher when car is fast and centered (low CTE)."""
|
|
env_fast_centered = MockStepEnv(speed=5.0, cte=0.1, original_reward=1.0)
|
|
env_slow_offset = MockStepEnv(speed=1.0, cte=3.0, original_reward=1.0)
|
|
|
|
wrapped_fast = SpeedRewardWrapper(env_fast_centered)
|
|
wrapped_slow = SpeedRewardWrapper(env_slow_offset)
|
|
|
|
_, reward_fast, _, _, _ = wrapped_fast.step(0)
|
|
_, reward_slow, _, _, _ = wrapped_slow.step(0)
|
|
|
|
assert reward_fast > reward_slow, \
|
|
f"Fast+centered should reward more: {reward_fast:.3f} vs {reward_slow:.3f}"
|
|
|
|
|
|
def test_speed_reward_zero_at_max_cte():
|
|
"""Reward should be ~0 when CTE = max_cte (on the edge of the road)."""
|
|
env = MockStepEnv(speed=5.0, cte=8.0, original_reward=1.0)
|
|
wrapped = SpeedRewardWrapper(env, max_cte=8.0)
|
|
_, reward, _, _, _ = wrapped.step(0)
|
|
assert reward == pytest.approx(0.0, abs=0.01), \
|
|
f"Reward at max CTE should be ~0, got {reward}"
|
|
|
|
|
|
def test_speed_reward_positive_when_on_track():
|
|
"""Reward should be positive when car is on track at any speed > 0."""
|
|
env = MockStepEnv(speed=2.0, cte=1.0, original_reward=1.0)
|
|
wrapped = SpeedRewardWrapper(env, max_cte=8.0)
|
|
_, reward, _, _, _ = wrapped.step(0)
|
|
assert reward > 0, f"On-track reward should be positive, got {reward}"
|
|
|
|
|
|
def test_crash_penalty_applied_on_done():
|
|
"""Crash penalty should be added when episode ends with negative reward."""
|
|
env = MockStepEnv(speed=0.0, cte=9.0, original_reward=-1.0, done=True)
|
|
wrapped = SpeedRewardWrapper(env, max_cte=8.0, crash_penalty=-10.0)
|
|
_, reward, terminated, truncated, _ = wrapped.step(0)
|
|
assert reward < -5.0, f"Crash penalty should make reward very negative, got {reward}"
|
|
|
|
|
|
def test_fallback_to_original_reward_when_info_missing():
|
|
"""If info doesn't have speed/cte, should fall back to original reward."""
|
|
class NoInfoEnv(gym.Env):
|
|
metadata = {'render_modes': []}
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.action_space = gym.spaces.Discrete(5)
|
|
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
|
|
def reset(self, seed=None, **kwargs):
|
|
return np.zeros((120, 160, 3), dtype=np.uint8), {}
|
|
def step(self, action):
|
|
return np.zeros((120, 160, 3), dtype=np.uint8), 0.75, False, False, {}
|
|
def close(self):
|
|
pass
|
|
|
|
wrapped = SpeedRewardWrapper(NoInfoEnv())
|
|
_, reward, _, _, _ = wrapped.step(0)
|
|
assert reward == pytest.approx(0.75, abs=1e-6), \
|
|
f"Should fall back to original reward 0.75, got {reward}"
|
|
|
|
|
|
def test_wrapper_preserves_observation():
|
|
"""SpeedRewardWrapper should not modify observations."""
|
|
obs_data = np.zeros((120, 160, 3), dtype=np.uint8)
|
|
|
|
class FixedObsEnv(gym.Env):
|
|
metadata = {'render_modes': []}
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.action_space = gym.spaces.Discrete(5)
|
|
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
|
|
def reset(self, seed=None, **kwargs):
|
|
return obs_data.copy(), {}
|
|
def step(self, action):
|
|
return obs_data.copy(), 1.0, False, False, {'speed': 2.0, 'cte': 0.5}
|
|
def close(self):
|
|
pass
|
|
|
|
wrapped = SpeedRewardWrapper(FixedObsEnv())
|
|
obs, _, _, _, _ = wrapped.step(0)
|
|
np.testing.assert_array_almost_equal(obs, obs_data)
|
|
|
|
|
|
def test_4tuple_step_compatibility():
|
|
"""Wrapper should handle 4-tuple step() return (old gym API)."""
|
|
env = MockStepEnv(speed=2.0, cte=1.0, original_reward=1.0, use_5tuple=False)
|
|
wrapped = SpeedRewardWrapper(env)
|
|
result = wrapped.step(0)
|
|
assert len(result) == 4, f"Expected 4-tuple, got {len(result)}"
|
|
_, reward, done, info = result
|
|
assert isinstance(reward, float)
|