donkeycar-rl-autoresearch/tests/test_reward_wrapper.py

241 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Tests for reward_wrapper.py v3 (path efficiency / anti-circular) — no simulator required.
"""
import sys
import os
import math
import pytest
import numpy as np
import gymnasium as gym
from collections import deque
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent'))
from reward_wrapper import SpeedRewardWrapper
def make_env_with_pos(speed=2.0, original_reward=1.0, done=False, pos=(0.0, 0.0, 0.0)):
"""Create a mock env that returns a specific position in info dict."""
class PosEnv(gym.Env):
metadata = {'render_modes': []}
def __init__(self):
super().__init__()
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
self._pos = list(pos)
self._speed = speed
self._reward = original_reward
self._done = done
def set_pos(self, p):
self._pos = list(p)
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
obs = np.zeros((120, 160, 3), dtype=np.uint8)
info = {'speed': self._speed, 'pos': self._pos}
return obs, self._reward, self._done, False, info
def close(self):
pass
return PosEnv()
# ---- Core Anti-Hacking Tests (inherited from v2) ----
def test_no_speed_bonus_when_off_track():
"""Off-track reward (≤ 0) must NOT get a speed bonus regardless of efficiency."""
env = make_env_with_pos(speed=10.0, original_reward=-1.0)
wrapped = SpeedRewardWrapper(env, speed_scale=0.5)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
assert reward == -1.0, f"Off-track reward must not get bonus, got {reward}"
def test_no_speed_bonus_when_reward_zero():
"""Reward exactly 0 should not get speed bonus."""
env = make_env_with_pos(speed=5.0, original_reward=0.0)
wrapped = SpeedRewardWrapper(env, speed_scale=0.5)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
assert reward == 0.0, f"Zero reward should stay zero, got {reward}"
# ---- Path Efficiency Tests ----
def _simulate_straight_driving(wrapped_env, env, steps=40, speed=3.0, step_size=0.1):
"""Simulate straight-line driving: car moves forward by step_size each step."""
wrapped_env.reset()
rewards = []
for i in range(steps):
env.set_pos([i * step_size, 0.0, 0.0])
env._speed = speed
_, r, _, _, _ = wrapped_env.step(0)
rewards.append(r)
return rewards
def _simulate_circular_driving(wrapped_env, env, steps=40, speed=3.0, radius=0.5):
"""Simulate circular driving: car moves in a circle, returns to start."""
wrapped_env.reset()
rewards = []
for i in range(steps):
angle = 2 * math.pi * i / steps
x = radius * math.cos(angle)
z = radius * math.sin(angle)
env.set_pos([x, 0.0, z])
env._speed = speed
_, r, _, _, _ = wrapped_env.step(0)
rewards.append(r)
return rewards
def test_straight_driving_gets_higher_reward_than_circular():
"""
CRITICAL: Straight driving must produce more total reward than circular driving
at the same speed and base reward. This is the core anti-circular guarantee.
"""
env_straight = make_env_with_pos(speed=3.0, original_reward=0.8)
env_circular = make_env_with_pos(speed=3.0, original_reward=0.8)
wrapped_straight = SpeedRewardWrapper(env_straight, speed_scale=0.1, window_size=20)
wrapped_circular = SpeedRewardWrapper(env_circular, speed_scale=0.1, window_size=20)
straight_rewards = _simulate_straight_driving(wrapped_straight, env_straight, steps=40)
circular_rewards = _simulate_circular_driving(wrapped_circular, env_circular, steps=40)
# After warmup (window fills), straight should consistently beat circular
straight_tail = sum(straight_rewards[20:])
circular_tail = sum(circular_rewards[20:])
assert straight_tail > circular_tail, (
f"Straight driving ({straight_tail:.2f}) should beat circular ({circular_tail:.2f})"
)
def test_efficiency_near_one_for_straight_driving():
"""Path efficiency should be near 1.0 for straight-line motion."""
env = make_env_with_pos(speed=3.0, original_reward=1.0)
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
wrapped.reset()
# Drive in a straight line
for i in range(15):
env.set_pos([i * 0.2, 0.0, 0.0])
wrapped.step(0)
efficiency = wrapped._compute_efficiency()
assert efficiency > 0.90, f"Straight driving efficiency should be >0.90, got {efficiency:.4f}"
def test_efficiency_near_zero_for_circular_driving():
"""Path efficiency should be near 0.0 for full circular motion."""
env = make_env_with_pos(speed=3.0, original_reward=1.0)
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=20)
wrapped.reset()
# Drive a full circle (returns to start position)
radius = 1.0
steps = 25 # More than window_size to fill it
for i in range(steps):
angle = 2 * math.pi * i / 24 # 24 steps = full circle
x = radius * math.cos(angle)
z = radius * math.sin(angle)
env.set_pos([x, 0.0, z])
wrapped.step(0)
efficiency = wrapped._compute_efficiency()
assert efficiency < 0.2, f"Circular driving efficiency should be <0.2, got {efficiency:.4f}"
def test_efficiency_one_with_no_pos_history():
"""When position not available, efficiency should default to 1.0 (no penalty)."""
class NoPosEnv(gym.Env):
metadata = {'render_modes': []}
def __init__(self):
super().__init__()
self.action_space = gym.spaces.Discrete(5)
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8)
def reset(self, seed=None, **kwargs):
return np.zeros((120, 160, 3), dtype=np.uint8), {}
def step(self, action):
return np.zeros((120, 160, 3), dtype=np.uint8), 0.8, False, False, {'speed': 2.0} # No pos
def close(self):
pass
wrapped = SpeedRewardWrapper(NoPosEnv(), speed_scale=0.1)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
# Without pos, efficiency=1.0, so reward = 0.8 * (1 + 0.1*2*1.0) = 0.96
assert reward > 0.8, f"Without pos, should get speed bonus (efficiency=1.0), got {reward}"
def test_efficiency_resets_on_episode_reset():
"""Position history should clear on reset, so each episode starts fresh."""
env = make_env_with_pos(speed=3.0, original_reward=1.0)
wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10)
wrapped.reset()
# Fill with circular data
radius = 0.5
for i in range(15):
angle = 2 * math.pi * i / 12
env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)])
wrapped.step(0)
eff_before_reset = wrapped._compute_efficiency()
# Reset and drive straight for a few steps
wrapped.reset()
for i in range(3):
env.set_pos([i * 0.3, 0.0, 0.0])
wrapped.step(0)
eff_after_reset = wrapped._compute_efficiency()
assert eff_after_reset > eff_before_reset, \
f"After reset, efficiency should improve: before={eff_before_reset:.3f}, after={eff_after_reset:.3f}"
def test_speed_bonus_disappears_when_circling():
"""After circling for window_size steps, speed bonus should be nearly zero."""
env = make_env_with_pos(speed=5.0, original_reward=1.0)
wrapped = SpeedRewardWrapper(env, speed_scale=0.5, window_size=20, min_efficiency=0.05)
wrapped.reset()
# Warm up with circular motion
radius = 0.5
rewards = []
for i in range(30):
angle = 2 * math.pi * (i % 20) / 20 # Full circle every 20 steps
env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)])
_, r, _, _, _ = wrapped.step(0)
rewards.append(r)
# Later rewards (after window fills) should be close to original_reward
later_rewards = rewards[20:]
avg_later = sum(later_rewards) / len(later_rewards)
assert avg_later < 1.3, \
f"Circular driving speed bonus should be suppressed, avg reward={avg_later:.3f} (original=1.0)"
# ---- Inherited guarantees ----
def test_crash_still_penalized():
"""Crash (original_reward=-1) should remain -1 regardless of speed or efficiency."""
env = make_env_with_pos(speed=8.0, original_reward=-1.0, done=True)
wrapped = SpeedRewardWrapper(env, speed_scale=0.2)
wrapped.reset()
_, reward, _, _, _ = wrapped.step(0)
assert reward == -1.0, f"Crash reward should remain -1.0, got {reward}"
def test_theoretical_max_per_step():
"""Max reward/step bounded: original(1.0) × (1 + speed_scale × max_speed)."""
env = make_env_with_pos()
wrapped = SpeedRewardWrapper(env, speed_scale=0.1)
assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6)