donkeycar-rl-autoresearch/tests/test_reward_wrapper.py

460 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tests for reward_wrapper.py v4 (full sim bypass — base × efficiency × speed).
"""
import sys, os, math, pytest
import numpy as np
import gymnasium as gym
from collections import deque
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
from reward_wrapper import SpeedRewardWrapper
# ---- Mock Environments ----
class MockEnv(gym.Env):
"""Configurable mock gymnasium.Env."""
metadata = {'render_modes': []}
def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True):
super().__init__()
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8)
self._speed = speed
self._cte = cte
self._pos = list(pos)
self._done = done
self._use_5tuple = use_5tuple
def set_pos(self, p): self._pos = list(p)
def set_cte(self, c): self._cte = c
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
obs = np.zeros((120, 160, 3), dtype=np.uint8)
# Sim reward uses forward_vel (exploitable) — wrapper should IGNORE this
sim_reward = 999.0 # Deliberately bogus — wrapper must not use this
info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos}
if self._use_5tuple:
return obs, sim_reward, self._done, False, info
return obs, sim_reward, self._done, info
def close(self): pass
def step_wrapped(wrapped_env, env, pos, cte=0.5, speed=2.0):
env.set_pos(pos)
env.set_cte(cte)
env._speed = speed
return wrapped_env.step(0)
# ---- Core v4 Properties ----
def test_sim_reward_is_completely_ignored():
"""
The wrapper must NOT use the sim's reward (999.0).
v4 computes reward from scratch using CTE/pos/speed only.
"""
env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.))
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
assert reward != 999.0, "Wrapper must not pass through sim's bogus reward"
assert reward < 10.0, f"Reward should be small, got {reward}"
def test_circling_at_zero_cte_gives_near_zero_reward():
"""
v6: circling (low efficiency) should yield zero reward via the efficiency gate.
After enough steps of circular motion, the efficiency drops below threshold
and the gate zeros the reward.
"""
env = MockEnv(speed=3.0, cte=0.0)
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=30, min_efficiency=0.15)
wrapped.reset()
# Drive in a circle for enough steps to fill the position window
rewards = []
for i in range(40):
angle = 2 * math.pi * i / 12 # completes circle every 12 steps
env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)])
_, r, _, _, _ = wrapped.step(0)
rewards.append(r)
# After 20+ steps of circular motion, efficiency gate should kick in
# Last few rewards should be 0.0
assert rewards[-1] == 0.0, (
f"v6: circular driving should yield 0.0 reward via efficiency gate, got {rewards[-1]:.4f}")
assert sum(1 for r in rewards[-5:] if r == 0.0) >= 3, (
f"v6: most of last 5 rewards during circle should be 0.0, got {rewards[-5:]}")
def test_forward_driving_earns_positive_reward():
"""Straight-line driving at low CTE and reasonable speed earns positive reward."""
env = MockEnv(speed=5.0, cte=0.5)
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
wrapped.reset()
_, r, _, _, _ = wrapped.step(0)
# reward = (5/10) * (1 - 0.5/8) = 0.5 * 0.9375 = 0.469
assert r > 0.3, f"Forward driving should earn >0.3 reward, got {r:.4f}"
def test_forward_beats_circling_by_large_margin():
"""
v6: forward driving earns positive reward; circular driving earns zero.
The efficiency gate ensures this gap.
"""
# Forward driving at CTE=1m, speed=5
env_fwd = MockEnv(speed=5.0, cte=1.0)
wrapped_fwd = SpeedRewardWrapper(env_fwd, speed_scale=0.1, window_size=30)
wrapped_fwd.reset()
for i in range(35):
env_fwd.set_pos([i * 0.5, 0., 0.]) # straight line
_, r_fwd, _, _, _ = wrapped_fwd.step(0)
# Circular driving at CTE=0, speed=5
env_circ = MockEnv(speed=5.0, cte=0.0)
wrapped_circ = SpeedRewardWrapper(env_circ, speed_scale=0.1, window_size=30)
wrapped_circ.reset()
for i in range(35):
angle = 2 * math.pi * i / 12
env_circ.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)])
_, r_circ, _, _, _ = wrapped_circ.step(0)
assert r_fwd > 0, f"Forward driving should earn positive reward, got {r_fwd}"
assert r_circ == 0.0, f"Circular driving should earn 0 reward, got {r_circ}"
assert r_fwd > r_circ, f"Forward ({r_fwd:.3f}) must beat circling ({r_circ:.3f})"
def test_crash_gives_negative_reward():
"""Episode termination (done=True) must always give -1.0."""
env = MockEnv(speed=5.0, cte=0.0, done=True)
wrapped = SpeedRewardWrapper(env, speed_scale=0.2)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
assert reward == -1.0, f"Crash reward must be -1.0, got {reward}"
def test_high_cte_reduces_reward():
"""Higher CTE should reduce reward (closer to track edge = lower base)."""
env_low = MockEnv(speed=2.0, cte=0.5)
env_high = MockEnv(speed=2.0, cte=4.0)
wrapped_low = SpeedRewardWrapper(env_low, speed_scale=0.1, window_size=5)
wrapped_high = SpeedRewardWrapper(env_high, speed_scale=0.1, window_size=5)
wrapped_low.reset()
wrapped_high.reset()
# Drive straight so efficiency fills up
for i in range(10):
env_low.set_pos([i * 0.3, 0., 0.])
env_high.set_pos([i * 0.3, 0., 0.])
_, r_low, _, _, _ = wrapped_low.step(0)
_, r_high, _, _, _ = wrapped_high.step(0)
assert r_low > r_high, f"Low CTE ({r_low:.3f}) should reward more than high CTE ({r_high:.3f})"
def test_speed_bonus_increases_reward_when_on_track():
"""Faster forward driving earns more reward than slower forward driving."""
env_slow = MockEnv(speed=0.5, cte=1.0)
env_fast = MockEnv(speed=3.0, cte=1.0)
wrapped_slow = SpeedRewardWrapper(env_slow, speed_scale=0.1, window_size=10)
wrapped_fast = SpeedRewardWrapper(env_fast, speed_scale=0.1, window_size=10)
wrapped_slow.reset()
wrapped_fast.reset()
for i in range(15):
env_slow.set_pos([i * 0.1, 0., 0.])
env_fast.set_pos([i * 0.3, 0., 0.]) # Fast car covers more ground
_, r_slow, _, _, _ = wrapped_slow.step(0)
_, r_fast, _, _, _ = wrapped_fast.step(0)
assert r_fast > r_slow, f"Fast ({r_fast:.3f}) should earn more than slow ({r_slow:.3f})"
def test_theoretical_max_per_step():
"""Max reward/step = 1.0 × 1.0 × (1 + scale × max_speed) = 2.0 at scale=0.1, max=10."""
env = MockEnv()
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6)
def test_4tuple_step_compatibility():
"""Wrapper must handle 4-tuple step() return (old gym API)."""
env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False)
env.set_pos([0., 0., 0.])
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
wrapped.reset()
result = wrapped.step(0)
assert len(result) == 4, f"Expected 4-tuple, got {len(result)}"
_, reward, done, info = result
assert isinstance(reward, float)
assert reward != 999.0, "Should not use sim reward"
def test_reward_resets_on_episode_reset():
"""After reset, position history clears so efficiency recalculates cleanly."""
env = MockEnv(speed=2.0, cte=0.5)
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
wrapped.reset()
# Fill with circular data
for i in range(15):
angle = 2 * math.pi * i / 12
env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)])
wrapped.step(0)
# After reset, start fresh straight
wrapped.reset()
rewards = []
for i in range(5):
env.set_pos([i * 0.3, 0., 0.])
_, r, _, _, _ = wrapped.step(0)
rewards.append(r)
# Should get reasonable reward after fresh start
assert rewards[-1] > 0, "Should get positive reward after reset and straight driving"
# ---------------------------------------------------------------------------
# Short-lap exploit patch tests
# ---------------------------------------------------------------------------
def test_short_lap_triggers_penalty():
"""
A lap completed faster than min_lap_time must return a large penalty,
not a positive reward. This closes the start/finish circle exploit.
"""
env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.))
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
wrapper.reset()
# Simulate step where a new lap completes in 1 second (exploit)
info = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0),
'lap_count': 1, 'last_lap_time': 1.0}
reward, _ = wrapper._compute_reward_and_done(done=False, info=info)
assert reward < 0, f'Short lap (1s) should penalise, got reward={reward}'
assert reward <= -10.0, f'Short lap penalty should be large (<= -10), got {reward}'
def test_legitimate_lap_not_penalised():
"""
A lap completed above min_lap_time must NOT trigger the penalty.
"""
env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.))
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
wrapper.reset()
# First step — no lap yet
info_no_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0),
'lap_count': 0, 'last_lap_time': 0.0}
wrapper._compute_reward_and_done(done=False, info=info_no_lap)
# Legitimate lap at 12 seconds
info = {'cte': 0.2, 'speed': 3.0, 'pos': (1.0, 0.0, 0.0),
'lap_count': 1, 'last_lap_time': 12.0}
reward, _ = wrapper._compute_reward_and_done(done=False, info=info)
assert reward >= 0, f'Legitimate lap (12s) should not be penalised, got {reward}'
def test_lap_count_not_double_penalised():
"""
Penalty fires exactly once per short lap, not on every subsequent step.
"""
env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.))
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
wrapper.reset()
# Short lap fires on step where lap_count increments
info_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0),
'lap_count': 1, 'last_lap_time': 1.5}
r1, _ = wrapper._compute_reward_and_done(done=False, info=info_lap)
assert r1 < 0
# Next step same lap_count — should get normal reward, not another penalty
info_next = {'cte': 0.0, 'speed': 3.0, 'pos': (0.1, 0.0, 0.0),
'lap_count': 1, 'last_lap_time': 1.5}
r2, _ = wrapper._compute_reward_and_done(done=False, info=info_next)
assert r2 >= 0, f'Penalty should not repeat on same lap_count, got r2={r2}'
def test_lap_count_resets_on_episode_reset():
"""lap_count tracker must reset when the episode resets."""
env = MockEnv(speed=3.0, cte=0.0, pos=(0.,0.,0.))
wrapper = SpeedRewardWrapper(env, min_lap_time=5.0)
wrapper.reset()
# Complete a short lap
info_lap = {'cte': 0.0, 'speed': 3.0, 'pos': (0.0, 0.0, 0.0),
'lap_count': 1, 'last_lap_time': 1.0}
wrapper._compute_reward_and_done(done=False, info=info_lap)
assert wrapper._last_lap_count == 1
# Reset episode — counter must go back to 0
wrapper.reset()
assert wrapper._last_lap_count == 0
# ---------------------------------------------------------------------------
# v6.1 exploit terminator tests
# ---------------------------------------------------------------------------
def test_sustained_high_cte_terminates_episode():
"""
Grass exploit fix: if CTE exceeds max_cte_terminate for cte_patience
consecutive steps, the episode must be force-terminated with -1.0 reward.
This catches the generated_track gap where car drives indefinitely on grass.
"""
env = MockEnv(speed=3.0, cte=5.0) # CTE=5.0 > max_cte_terminate=4.0
wrapper = SpeedRewardWrapper(env, max_cte_terminate=4.0, cte_patience=5)
wrapper.reset()
rewards = []
terminated = []
for _ in range(10):
info = {'cte': 5.0, 'speed': 3.0, 'pos': (0., 0., 0.),
'active_node': 0, 'lap_count': 0, 'last_lap_time': 0.0}
r, force_term = wrapper._compute_reward_and_done(done=False, info=info)
rewards.append(r)
terminated.append(force_term)
# Should terminate at step 5 (cte_patience=5)
assert terminated[4] == True, f'Should force-terminate at step 5, got {terminated}'
assert rewards[4] == -1.0, f'Termination reward should be -1.0, got {rewards[4]}'
assert terminated[0] == False, 'Should not terminate at step 1'
def test_high_cte_resets_when_back_on_track():
"""
High CTE counter must reset when car returns to track.
Prevents false termination after a brief excursion.
"""
env = MockEnv(speed=3.0, cte=0.5)
wrapper = SpeedRewardWrapper(env, max_cte_terminate=4.0, cte_patience=5)
wrapper.reset()
# 3 steps high CTE
for _ in range(3):
info = {'cte': 5.0, 'speed': 3.0, 'pos': (0., 0., 0.),
'active_node': 0, 'lap_count': 0, 'last_lap_time': 0.0}
r, ft = wrapper._compute_reward_and_done(done=False, info=info)
assert ft == False, 'Should not terminate after only 3 steps'
# 1 step back on track resets counter
info = {'cte': 1.0, 'speed': 3.0, 'pos': (0., 0., 0.),
'active_node': 1, 'lap_count': 0, 'last_lap_time': 0.0}
wrapper._compute_reward_and_done(done=False, info=info)
assert wrapper._high_cte_steps == 0, 'CTE counter should reset when back on track'
# 5 more steps high CTE — should now terminate (counter starts fresh)
for i in range(5):
info = {'cte': 5.0, 'speed': 3.0, 'pos': (0., 0., 0.),
'active_node': 1, 'lap_count': 0, 'last_lap_time': 0.0}
r, ft = wrapper._compute_reward_and_done(done=False, info=info)
assert ft == True, 'Should terminate after 5 new consecutive high-CTE steps'
def test_no_track_progress_terminates_episode():
"""
Circle/stuck exploit fix: if max active_node doesn't advance for
progress_patience steps, the episode must be force-terminated.
A circling car stays near the same waypoints — max_node never increases.
"""
env = MockEnv(speed=3.0, cte=0.5)
wrapper = SpeedRewardWrapper(env, progress_patience=10)
wrapper.reset()
# First step initialises max_node to 5, then 10 more steps stuck at 5 → terminate
for i in range(12):
info = {'cte': 0.5, 'speed': 3.0, 'pos': (float(i)*0.1, 0., 0.),
'active_node': 5, 'total_nodes': 100,
'lap_count': 0, 'last_lap_time': 0.0}
r, ft = wrapper._compute_reward_and_done(done=False, info=info)
if ft:
break
assert ft == True, 'Should terminate when max active_node not advancing'
assert r == -1.0
def test_track_progress_resets_counter():
"""
Advancing to a new max active_node must reset the no-progress counter.
"""
env = MockEnv(speed=3.0, cte=0.5)
wrapper = SpeedRewardWrapper(env, progress_patience=5)
wrapper.reset()
# Step forward: nodes 0, 1, 2, 3 — each new node resets counter
for node in range(4):
info = {'cte': 0.5, 'speed': 3.0, 'pos': (float(node)*0.5, 0., 0.),
'active_node': node, 'total_nodes': 100,
'lap_count': 0, 'last_lap_time': 0.0}
r, ft = wrapper._compute_reward_and_done(done=False, info=info)
assert ft == False, f'Should not terminate when advancing (node {node})'
assert wrapper._no_progress_steps == 0, 'Counter should reset on new max node'
def test_circle_exploit_terminates():
"""
A car circling near the same spot should be terminated.
active_node oscillates but never exceeds the initial max.
"""
env = MockEnv(speed=3.0, cte=0.5)
wrapper = SpeedRewardWrapper(env, progress_patience=10)
wrapper.reset()
# Set max_node to 10
info = {'cte': 0.5, 'speed': 3.0, 'pos': (1., 0., 0.),
'active_node': 10, 'total_nodes': 100,
'lap_count': 0, 'last_lap_time': 0.0}
wrapper._compute_reward_and_done(done=False, info=info)
# Now oscillate between nodes 8-10 (circling near node 10)
terminated = False
for i in range(20):
node = 8 + (i % 3) # oscillates 8, 9, 10, 8, 9, 10...
info = {'cte': 0.5, 'speed': 3.0, 'pos': (1., 0., 0.),
'active_node': node, 'total_nodes': 100,
'lap_count': 0, 'last_lap_time': 0.0}
r, ft = wrapper._compute_reward_and_done(done=False, info=info)
if ft:
terminated = True
break
assert terminated, 'Circling (oscillating active_node, no new max) should terminate'
def test_lap_completion_resets_progress_tracker():
"""
On lap completion, active_node resets to 0. Progress tracker must also
reset so the car isn't immediately terminated for 'no progress'.
"""
env = MockEnv(speed=3.0, cte=0.5)
wrapper = SpeedRewardWrapper(env, progress_patience=5, min_lap_time=5.0)
wrapper.reset()
# Drive to near end of track
info = {'cte': 0.5, 'speed': 3.0, 'pos': (1., 0., 0.),
'active_node': 99, 'total_nodes': 100,
'lap_count': 0, 'last_lap_time': 0.0}
wrapper._compute_reward_and_done(done=False, info=info)
assert wrapper._max_node_seen == 99
# Complete a valid lap
info = {'cte': 0.5, 'speed': 3.0, 'pos': (0., 0., 0.),
'active_node': 0, 'total_nodes': 100,
'lap_count': 1, 'last_lap_time': 12.0} # 12s lap = valid
r, ft = wrapper._compute_reward_and_done(done=False, info=info)
# Progress tracker should be reset
assert wrapper._max_node_seen == -1, 'max_node_seen should reset on lap completion'
assert wrapper._no_progress_steps == 0
assert ft == False, 'Valid lap should not terminate'