donkeycar-rl-autoresearch/tests/test_reward_wrapper.py

186 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tests for reward_wrapper.py v2 (hack-proof multiplicative formula) — no simulator required.
"""
import sys
import os
import pytest
import numpy as np
import gymnasium as gym
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
from reward_wrapper import SpeedRewardWrapper
class MockStepEnv(gym.Env):
"""Mock gymnasium.Env for testing SpeedRewardWrapper."""
metadata = {'render_modes': []}
def __init__(self, speed=2.0, original_reward=1.0, done=False, use_5tuple=True):
super().__init__()
self._speed = speed
self._reward = original_reward
self._done = done
self._use_5tuple = use_5tuple
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
obs = np.zeros((120, 160, 3), dtype=np.uint8)
info = {'speed': self._speed}
if self._use_5tuple:
return obs, self._reward, self._done, False, info
else:
return obs, self._reward, self._done, info
def close(self):
pass
# ---- Hack-Proof Guarantee Tests ----
def test_no_speed_bonus_when_off_track():
"""
CRITICAL: Off-track reward (≤ 0) must NOT get a speed bonus.
This is the core anti-hacking guarantee.
"""
env = MockStepEnv(speed=10.0, original_reward=-1.0) # Off track, very fast
wrapped = SpeedRewardWrapper(env, speed_scale=0.5)
_, reward, _, _, _ = wrapped.step(0)
assert reward == -1.0, \
f"Off-track reward must not get speed bonus, got {reward}"
def test_no_speed_bonus_when_reward_zero():
"""Reward exactly 0 (boundary case) should not get speed bonus."""
env = MockStepEnv(speed=5.0, original_reward=0.0)
wrapped = SpeedRewardWrapper(env, speed_scale=0.5)
_, reward, _, _, _ = wrapped.step(0)
assert reward == 0.0, f"Zero reward should stay zero, got {reward}"
def test_speed_bonus_scales_with_speed_when_on_track():
"""When on track (positive reward), faster = higher shaped reward."""
env_slow = MockStepEnv(speed=1.0, original_reward=0.8)
env_fast = MockStepEnv(speed=5.0, original_reward=0.8)
wrapped_slow = SpeedRewardWrapper(env_slow, speed_scale=0.1)
wrapped_fast = SpeedRewardWrapper(env_fast, speed_scale=0.1)
_, r_slow, _, _, _ = wrapped_slow.step(0)
_, r_fast, _, _, _ = wrapped_fast.step(0)
assert r_fast > r_slow, f"Faster on-track should reward more: {r_fast:.3f} vs {r_slow:.3f}"
def test_multiplicative_formula_correct():
"""
Verify exact formula: shaped = original × (1 + speed_scale × speed)
"""
original_reward = 0.6
speed = 3.0
speed_scale = 0.1
expected = original_reward * (1.0 + speed_scale * speed) # 0.6 × 1.3 = 0.78
env = MockStepEnv(speed=speed, original_reward=original_reward)
wrapped = SpeedRewardWrapper(env, speed_scale=speed_scale)
_, reward, _, _, _ = wrapped.step(0)
assert reward == pytest.approx(expected, abs=1e-6), \
f"Expected {expected:.6f}, got {reward:.6f}"
def test_cannot_hack_by_going_fast_off_track():
"""
Demonstrate that the previous formula could be hacked but this one cannot.
Fast off-track (speed=10) must give same or worse result than slow off-track (speed=1).
"""
env_fast_offtrack = MockStepEnv(speed=10.0, original_reward=-1.0)
env_slow_offtrack = MockStepEnv(speed=1.0, original_reward=-1.0)
wrapped_fast = SpeedRewardWrapper(env_fast_offtrack, speed_scale=0.5)
wrapped_slow = SpeedRewardWrapper(env_slow_offtrack, speed_scale=0.5)
_, r_fast, _, _, _ = wrapped_fast.step(0)
_, r_slow, _, _, _ = wrapped_slow.step(0)
assert r_fast == r_slow == -1.0, \
f"Off-track reward must be identical regardless of speed: fast={r_fast}, slow={r_slow}"
def test_theoretical_max_per_step():
"""
Verify theoretical_max_per_step returns correct upper bound.
With speed_scale=0.1 and max_speed=10.0: max = 1.0 × (1 + 0.1×10) = 2.0
"""
env = MockStepEnv()
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
max_reward = wrapped.theoretical_max_per_step(max_speed=10.0)
assert max_reward == pytest.approx(2.0, abs=1e-6), \
f"Max per step should be 2.0, got {max_reward}"
def test_fallback_when_speed_not_in_info():
"""If info doesn't have speed, fall back to original reward."""
class NoSpeedEnv(gym.Env):
metadata = {'render_modes': []}
def __init__(self):
super().__init__()
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
return np.zeros((120, 160, 3), dtype=np.uint8), 0.75, False, False, {} # No 'speed' key
def close(self):
pass
wrapped = SpeedRewardWrapper(NoSpeedEnv(), speed_scale=0.5)
_, reward, _, _, _ = wrapped.step(0)
# speed=0.0 default → shaped = 0.75 × (1 + 0.5 × 0.0) = 0.75
assert reward == pytest.approx(0.75, abs=1e-6), \
f"Should fall back gracefully, got {reward}"
def test_wrapper_preserves_observation():
"""SpeedRewardWrapper must not modify observations."""
class FixedObsEnv(gym.Env):
metadata = {'render_modes': []}
def __init__(self):
super().__init__()
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
return np.zeros((120, 160, 3), dtype=np.uint8), 0.8, False, False, {'speed': 2.0}
def close(self):
pass
wrapped = SpeedRewardWrapper(FixedObsEnv())
obs, _, _, _, _ = wrapped.step(0)
np.testing.assert_array_equal(obs, np.zeros((120, 160, 3), dtype=np.uint8))
def test_4tuple_step_compatibility():
"""Wrapper should handle 4-tuple step() return (old gym API)."""
env = MockStepEnv(speed=2.0, original_reward=0.8, use_5tuple=False)
wrapped = SpeedRewardWrapper(env)
result = wrapped.step(0)
assert len(result) == 4, f"Expected 4-tuple, got {len(result)}"
_, reward, done, info = result
assert isinstance(reward, float)
assert reward > 0.8, "Speed bonus should increase reward when on track"
def test_crash_still_penalized():
"""Crash (original_reward=-1) should remain -1, not improved by speed."""
env = MockStepEnv(speed=8.0, original_reward=-1.0, done=True)
wrapped = SpeedRewardWrapper(env, speed_scale=0.2)
_, reward, _, _, _ = wrapped.step(0)
assert reward == -1.0, f"Crash reward should remain -1.0, got {reward}"