donkeycar-rl-autoresearch/tests/test_reward_wrapper.py

260 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for reward_wrapper.py v7 (clean: speed×CTE + efficiency gate)."""
import sys, os, math, pytest
import numpy as np
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
from reward_wrapper import SpeedRewardWrapper
import gymnasium as gym
class MockEnv(gym.Env):
metadata = {'render_modes': []}
def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True):
super().__init__()
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8)
self._speed = speed
self._cte = cte
self._pos = list(pos)
self._done = done
self._use_5tuple = use_5tuple
def set_pos(self, p): self._pos = list(p)
def set_cte(self, c): self._cte = c
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
obs = np.zeros((120, 160, 3), dtype=np.uint8)
sim_reward = 999.0 # deliberately bogus — wrapper must ignore this
info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos}
if self._use_5tuple:
return obs, sim_reward, self._done, False, info
return obs, sim_reward, self._done, info
# ── Helpers ──────────────────────────────────────────────────────────────────
def make_info(cte=0.5, speed=2.0, pos=None, active_node=1, lap_count=0, lap_time=0.0):
return {
'cte': cte, 'speed': speed,
'pos': pos or (0., 0., 0.),
'active_node': active_node, 'total_nodes': 100,
'lap_count': lap_count, 'last_lap_time': lap_time,
}
# ── Core reward properties ────────────────────────────────────────────────────
def test_sim_reward_is_completely_ignored():
env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.))
wrapped = SpeedRewardWrapper(env)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
assert reward != 999.0
assert reward < 10.0
def test_crash_gives_negative_one():
env = MockEnv(speed=5.0, cte=0.0, done=True)
wrapped = SpeedRewardWrapper(env)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
assert reward == -1.0
def test_forward_driving_earns_positive_reward():
env = MockEnv(speed=5.0, cte=0.5)
wrapped = SpeedRewardWrapper(env, window_size=10)
wrapped.reset()
_, r, _, _, _ = wrapped.step(0)
# reward = (5/10) * (1 - 0.5/8.0) = 0.5 * 0.9375 = 0.469
assert r > 0.3, f"Forward driving should earn >0.3, got {r:.4f}"
def test_higher_cte_reduces_reward():
env_low = MockEnv(speed=2.0, cte=0.5)
env_high = MockEnv(speed=2.0, cte=4.0)
w_low = SpeedRewardWrapper(env_low, window_size=5)
w_high = SpeedRewardWrapper(env_high, window_size=5)
w_low.reset(); w_high.reset()
for i in range(10):
env_low.set_pos( [i * 0.3, 0., 0.])
env_high.set_pos([i * 0.3, 0., 0.])
_, r_low, _, _, _ = w_low.step(0)
_, r_high, _, _, _ = w_high.step(0)
assert r_low > r_high
def test_higher_speed_increases_reward():
env_slow = MockEnv(speed=0.5, cte=1.0)
env_fast = MockEnv(speed=3.0, cte=1.0)
w_slow = SpeedRewardWrapper(env_slow, window_size=10)
w_fast = SpeedRewardWrapper(env_fast, window_size=10)
w_slow.reset(); w_fast.reset()
for i in range(15):
env_slow.set_pos([i * 0.1, 0., 0.])
env_fast.set_pos([i * 0.3, 0., 0.])
_, r_slow, _, _, _ = w_slow.step(0)
_, r_fast, _, _, _ = w_fast.step(0)
assert r_fast > r_slow
def test_4tuple_compatibility():
env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False)
env.set_pos([0., 0., 0.])
wrapped = SpeedRewardWrapper(env)
wrapped.reset()
result = wrapped.step(0)
assert len(result) == 4
_, reward, done, info = result
assert isinstance(reward, float)
assert reward != 999.0
# ── Efficiency gate ───────────────────────────────────────────────────────────
def test_circling_earns_zero_reward():
env = MockEnv(speed=3.0, cte=0.0)
wrapped = SpeedRewardWrapper(env, window_size=30, min_efficiency=0.15)
wrapped.reset()
rewards = []
for i in range(40):
angle = 2 * math.pi * i / 12
env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)])
_, r, _, _, _ = wrapped.step(0)
rewards.append(r)
assert rewards[-1] == 0.0
assert sum(1 for r in rewards[-5:] if r == 0.0) >= 3
def test_forward_beats_circling():
env_fwd = MockEnv(speed=5.0, cte=1.0)
w_fwd = SpeedRewardWrapper(env_fwd, window_size=30)
w_fwd.reset()
for i in range(35):
env_fwd.set_pos([i * 0.5, 0., 0.])
_, r_fwd, _, _, _ = w_fwd.step(0)
env_circ = MockEnv(speed=5.0, cte=0.0)
w_circ = SpeedRewardWrapper(env_circ, window_size=30)
w_circ.reset()
for i in range(35):
angle = 2 * math.pi * i / 12
env_circ.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)])
_, r_circ, _, _, _ = w_circ.step(0)
assert r_fwd > 0
assert r_circ == 0.0
def test_history_clears_on_reset():
env = MockEnv(speed=2.0, cte=0.5)
wrapped = SpeedRewardWrapper(env, window_size=10)
wrapped.reset()
for i in range(15):
angle = 2 * math.pi * i / 12
env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)])
wrapped.step(0)
wrapped.reset()
rewards = []
for i in range(5):
env.set_pos([i * 0.3, 0., 0.])
_, r, _, _, _ = wrapped.step(0)
rewards.append(r)
assert rewards[-1] > 0
# ── No-progress termination ───────────────────────────────────────────────────
def test_no_progress_terminates():
env = MockEnv(speed=3.0, cte=0.5)
wrapper = SpeedRewardWrapper(env, progress_patience=10)
wrapper.reset()
for i in range(12):
r, ft = wrapper._compute_reward(False, make_info(active_node=5, pos=(i*0.1, 0., 0.)))
if ft:
break
assert ft is True
assert r == -1.0
def test_progress_resets_counter():
env = MockEnv()
wrapper = SpeedRewardWrapper(env, progress_patience=5)
wrapper.reset()
for node in range(4):
r, ft = wrapper._compute_reward(False, make_info(active_node=node, pos=(node*0.5, 0., 0.)))
assert ft is False
assert wrapper._no_progress_steps == 0
def test_circling_active_node_terminates():
env = MockEnv()
wrapper = SpeedRewardWrapper(env, progress_patience=10)
wrapper.reset()
wrapper._compute_reward(False, make_info(active_node=10))
terminated = False
for i in range(20):
r, ft = wrapper._compute_reward(False, make_info(active_node=8 + (i % 3)))
if ft:
terminated = True
break
assert terminated
def test_lap_completion_resets_progress_tracker():
env = MockEnv()
wrapper = SpeedRewardWrapper(env, progress_patience=5, min_lap_time=5.0)
wrapper.reset()
wrapper._compute_reward(False, make_info(active_node=99))
assert wrapper._max_node_seen == 99
r, ft = wrapper._compute_reward(False, make_info(active_node=0, lap_count=1, lap_time=12.0))
assert wrapper._max_node_seen == -1
assert wrapper._no_progress_steps == 0
assert ft is False
# ── Lap exploit guard ─────────────────────────────────────────────────────────
def test_short_lap_penalised():
env = MockEnv()
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
wrapper.reset()
r, _ = wrapper._compute_reward(False, make_info(lap_count=1, lap_time=1.0))
assert r < 0
assert r <= -10.0
def test_legitimate_lap_not_penalised():
env = MockEnv()
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
wrapper.reset()
wrapper._compute_reward(False, make_info(lap_count=0))
r, _ = wrapper._compute_reward(False, make_info(lap_count=1, lap_time=12.0, pos=(1., 0., 0.)))
assert r >= 0
def test_lap_penalty_fires_once():
env = MockEnv()
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
wrapper.reset()
r1, _ = wrapper._compute_reward(False, make_info(lap_count=1, lap_time=1.5))
assert r1 < 0
r2, _ = wrapper._compute_reward(False, make_info(lap_count=1, lap_time=1.5, pos=(0.1, 0., 0.)))
assert r2 >= 0
def test_lap_count_resets_on_episode_reset():
env = MockEnv()
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
wrapper.reset()
wrapper._compute_reward(False, make_info(lap_count=1, lap_time=1.0))
assert wrapper._last_lap_count == 1
wrapper.reset()
assert wrapper._last_lap_count == 0