donkeycar-rl-autoresearch/tests/test_reward_wrapper.py

139 lines
5.1 KiB
Python

"""
Tests for reward_wrapper.py — no simulator required.
"""
import sys
import os
import pytest
import numpy as np
import gymnasium as gym
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
from reward_wrapper import SpeedRewardWrapper
class MockStepEnv(gym.Env):
"""Mock gymnasium.Env for testing SpeedRewardWrapper."""
metadata = {'render_modes': []}
def __init__(self, speed=2.0, cte=0.5, original_reward=1.0, done=False, use_5tuple=True):
super().__init__()
self._speed = speed
self._cte = cte
self._reward = original_reward
self._done = done
self._use_5tuple = use_5tuple
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
obs = np.zeros((120, 160, 3), dtype=np.uint8)
info = {'speed': self._speed, 'cte': self._cte}
if self._use_5tuple:
return obs, self._reward, self._done, False, info
else:
return obs, self._reward, self._done, info
def close(self):
pass
def close(self):
pass
def test_speed_reward_higher_when_fast_and_centered():
"""Reward should be higher when car is fast and centered (low CTE)."""
env_fast_centered = MockStepEnv(speed=5.0, cte=0.1, original_reward=1.0)
env_slow_offset = MockStepEnv(speed=1.0, cte=3.0, original_reward=1.0)
wrapped_fast = SpeedRewardWrapper(env_fast_centered)
wrapped_slow = SpeedRewardWrapper(env_slow_offset)
_, reward_fast, _, _, _ = wrapped_fast.step(0)
_, reward_slow, _, _, _ = wrapped_slow.step(0)
assert reward_fast > reward_slow, \
f"Fast+centered should reward more: {reward_fast:.3f} vs {reward_slow:.3f}"
def test_speed_reward_zero_at_max_cte():
"""Reward should be ~0 when CTE = max_cte (on the edge of the road)."""
env = MockStepEnv(speed=5.0, cte=8.0, original_reward=1.0)
wrapped = SpeedRewardWrapper(env, max_cte=8.0)
_, reward, _, _, _ = wrapped.step(0)
assert reward == pytest.approx(0.0, abs=0.01), \
f"Reward at max CTE should be ~0, got {reward}"
def test_speed_reward_positive_when_on_track():
"""Reward should be positive when car is on track at any speed > 0."""
env = MockStepEnv(speed=2.0, cte=1.0, original_reward=1.0)
wrapped = SpeedRewardWrapper(env, max_cte=8.0)
_, reward, _, _, _ = wrapped.step(0)
assert reward > 0, f"On-track reward should be positive, got {reward}"
def test_crash_penalty_applied_on_done():
"""Crash penalty should be added when episode ends with negative reward."""
env = MockStepEnv(speed=0.0, cte=9.0, original_reward=-1.0, done=True)
wrapped = SpeedRewardWrapper(env, max_cte=8.0, crash_penalty=-10.0)
_, reward, terminated, truncated, _ = wrapped.step(0)
assert reward < -5.0, f"Crash penalty should make reward very negative, got {reward}"
def test_fallback_to_original_reward_when_info_missing():
"""If info doesn't have speed/cte, should fall back to original reward."""
class NoInfoEnv(gym.Env):
metadata = {'render_modes': []}
def __init__(self):
super().__init__()
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
return np.zeros((120, 160, 3), dtype=np.uint8), 0.75, False, False, {}
def close(self):
pass
wrapped = SpeedRewardWrapper(NoInfoEnv())
_, reward, _, _, _ = wrapped.step(0)
assert reward == pytest.approx(0.75, abs=1e-6), \
f"Should fall back to original reward 0.75, got {reward}"
def test_wrapper_preserves_observation():
"""SpeedRewardWrapper should not modify observations."""
obs_data = np.zeros((120, 160, 3), dtype=np.uint8)
class FixedObsEnv(gym.Env):
metadata = {'render_modes': []}
def __init__(self):
super().__init__()
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
def reset(self, seed=None, **kwargs):
return obs_data.copy(), {}
def step(self, action):
return obs_data.copy(), 1.0, False, False, {'speed': 2.0, 'cte': 0.5}
def close(self):
pass
wrapped = SpeedRewardWrapper(FixedObsEnv())
obs, _, _, _, _ = wrapped.step(0)
np.testing.assert_array_almost_equal(obs, obs_data)
def test_4tuple_step_compatibility():
"""Wrapper should handle 4-tuple step() return (old gym API)."""
env = MockStepEnv(speed=2.0, cte=1.0, original_reward=1.0, use_5tuple=False)
wrapped = SpeedRewardWrapper(env)
result = wrapped.step(0)
assert len(result) == 4, f"Expected 4-tuple, got {len(result)}"
_, reward, done, info = result
assert isinstance(reward, float)