donkeycar-rl-autoresearch/tests/test_reward_wrapper.py

234 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tests for reward_wrapper.py v4 (full sim bypass — base × efficiency × speed).
"""
import sys, os, math, pytest
import numpy as np
import gymnasium as gym
from collections import deque
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
from reward_wrapper import SpeedRewardWrapper
# ---- Mock Environments ----
class MockEnv(gym.Env):
"""Configurable mock gymnasium.Env."""
metadata = {'render_modes': []}
def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True):
super().__init__()
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8)
self._speed = speed
self._cte = cte
self._pos = list(pos)
self._done = done
self._use_5tuple = use_5tuple
def set_pos(self, p): self._pos = list(p)
def set_cte(self, c): self._cte = c
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
obs = np.zeros((120, 160, 3), dtype=np.uint8)
# Sim reward uses forward_vel (exploitable) — wrapper should IGNORE this
sim_reward = 999.0 # Deliberately bogus — wrapper must not use this
info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos}
if self._use_5tuple:
return obs, sim_reward, self._done, False, info
return obs, sim_reward, self._done, info
def close(self): pass
def step_wrapped(wrapped_env, env, pos, cte=0.5, speed=2.0):
env.set_pos(pos)
env.set_cte(cte)
env._speed = speed
return wrapped_env.step(0)
# ---- Core v4 Properties ----
def test_sim_reward_is_completely_ignored():
"""
The wrapper must NOT use the sim's reward (999.0).
v4 computes reward from scratch using CTE/pos/speed only.
"""
env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.))
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
assert reward != 999.0, "Wrapper must not pass through sim's bogus reward"
assert reward < 10.0, f"Reward should be small, got {reward}"
def test_circling_at_zero_cte_gives_near_zero_reward():
"""
CORE v4 GUARANTEE: A spinning car at CTE=0 must earn near-zero reward.
v3 failed this: spinning at CTE=0 gave 1.0/step regardless of efficiency.
v4 multiplies base reward by efficiency → circling yields ≈ 0.
"""
env = MockEnv(speed=3.0, cte=0.0)
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=20)
wrapped.reset()
# Simulate full circles (returns to start position)
radius = 0.5
rewards = []
for i in range(30):
angle = 2 * math.pi * (i % 20) / 20
env.set_pos([radius * math.cos(angle), 0., radius * math.sin(angle)])
_, r, _, _, _ = wrapped.step(0)
rewards.append(r)
# After window fills, rewards should be near zero (circling detected)
late_rewards = rewards[20:]
avg = sum(late_rewards) / len(late_rewards)
assert avg < 0.15, f"Circling at CTE=0 should earn near-zero reward, got avg={avg:.4f}"
def test_forward_driving_earns_positive_reward():
"""Straight-line driving at low CTE earns a clear positive reward."""
env = MockEnv(speed=2.0, cte=0.5)
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
wrapped.reset()
rewards = []
for i in range(20):
env.set_pos([i * 0.3, 0., 0.])
_, r, _, _, _ = wrapped.step(0)
rewards.append(r)
late = rewards[10:]
avg = sum(late) / len(late)
assert avg > 0.5, f"Forward driving should earn >0.5 reward, got {avg:.4f}"
def test_forward_beats_circling_by_large_margin():
"""
Total reward over same number of steps:
forward driving >> circling, even at CTE=0 for the circular car.
"""
env_fwd = MockEnv(speed=2.0, cte=0.5)
env_circ = MockEnv(speed=2.0, cte=0.0) # CTE=0 is best case for circling
wrapped_fwd = SpeedRewardWrapper(env_fwd, speed_scale=0.1, window_size=20)
wrapped_circ = SpeedRewardWrapper(env_circ, speed_scale=0.1, window_size=20)
wrapped_fwd.reset()
wrapped_circ.reset()
total_fwd, total_circ = 0.0, 0.0
radius = 0.5
for i in range(40):
# Forward: moves in straight line
env_fwd.set_pos([i * 0.3, 0., 0.])
_, r, _, _, _ = wrapped_fwd.step(0)
total_fwd += r
# Circular: perfect circles at CTE=0
angle = 2 * math.pi * (i % 20) / 20
env_circ.set_pos([radius * math.cos(angle), 0., radius * math.sin(angle)])
_, r, _, _, _ = wrapped_circ.step(0)
total_circ += r
assert total_fwd > total_circ * 3, (
f"Forward ({total_fwd:.1f}) should beat circling ({total_circ:.1f}) by 3x"
)
def test_crash_gives_negative_reward():
"""Episode termination (done=True) must always give -1.0."""
env = MockEnv(speed=5.0, cte=0.0, done=True)
wrapped = SpeedRewardWrapper(env, speed_scale=0.2)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
assert reward == -1.0, f"Crash reward must be -1.0, got {reward}"
def test_high_cte_reduces_reward():
"""Higher CTE should reduce reward (closer to track edge = lower base)."""
env_low = MockEnv(speed=2.0, cte=0.5)
env_high = MockEnv(speed=2.0, cte=4.0)
wrapped_low = SpeedRewardWrapper(env_low, speed_scale=0.1, window_size=5)
wrapped_high = SpeedRewardWrapper(env_high, speed_scale=0.1, window_size=5)
wrapped_low.reset()
wrapped_high.reset()
# Drive straight so efficiency fills up
for i in range(10):
env_low.set_pos([i * 0.3, 0., 0.])
env_high.set_pos([i * 0.3, 0., 0.])
_, r_low, _, _, _ = wrapped_low.step(0)
_, r_high, _, _, _ = wrapped_high.step(0)
assert r_low > r_high, f"Low CTE ({r_low:.3f}) should reward more than high CTE ({r_high:.3f})"
def test_speed_bonus_increases_reward_when_on_track():
"""Faster forward driving earns more reward than slower forward driving."""
env_slow = MockEnv(speed=0.5, cte=1.0)
env_fast = MockEnv(speed=3.0, cte=1.0)
wrapped_slow = SpeedRewardWrapper(env_slow, speed_scale=0.1, window_size=10)
wrapped_fast = SpeedRewardWrapper(env_fast, speed_scale=0.1, window_size=10)
wrapped_slow.reset()
wrapped_fast.reset()
for i in range(15):
env_slow.set_pos([i * 0.1, 0., 0.])
env_fast.set_pos([i * 0.3, 0., 0.]) # Fast car covers more ground
_, r_slow, _, _, _ = wrapped_slow.step(0)
_, r_fast, _, _, _ = wrapped_fast.step(0)
assert r_fast > r_slow, f"Fast ({r_fast:.3f}) should earn more than slow ({r_slow:.3f})"
def test_theoretical_max_per_step():
"""Max reward/step = 1.0 × 1.0 × (1 + scale × max_speed) = 2.0 at scale=0.1, max=10."""
env = MockEnv()
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6)
def test_4tuple_step_compatibility():
"""Wrapper must handle 4-tuple step() return (old gym API)."""
env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False)
env.set_pos([0., 0., 0.])
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
wrapped.reset()
result = wrapped.step(0)
assert len(result) == 4, f"Expected 4-tuple, got {len(result)}"
_, reward, done, info = result
assert isinstance(reward, float)
assert reward != 999.0, "Should not use sim reward"
def test_reward_resets_on_episode_reset():
"""After reset, position history clears so efficiency recalculates cleanly."""
env = MockEnv(speed=2.0, cte=0.5)
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
wrapped.reset()
# Fill with circular data
for i in range(15):
angle = 2 * math.pi * i / 12
env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)])
wrapped.step(0)
# After reset, start fresh straight
wrapped.reset()
rewards = []
for i in range(5):
env.set_pos([i * 0.3, 0., 0.])
_, r, _, _, _ = wrapped.step(0)
rewards.append(r)
# Should get reasonable reward after fresh start
assert rewards[-1] > 0, "Should get positive reward after reset and straight driving"