donkeycar-rl-autoresearch/agent/reward_wrapper.py

188 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Speed + Progress Reward Wrapper for DonkeyCar RL — v4 (Full Bypass)
====================================================================
REWARD HACKING HISTORY:
v1 additive: speed × (1-cte/max_cte) → boundary oscillation
v2 multiplicative: original × (1+speed×scale) → circular driving (on-track)
v3 path efficiency: original × (1+speed×eff×scale) → still circling!
WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward.
A spinning car at CTE≈0 still earns 1.0/step × thousands of steps.
v4 (THIS VERSION): Completely bypass sim's reward. Multiply base reward by
efficiency so circling yields ZERO reward regardless of CTE.
ROOT CAUSE OF CIRCLING:
The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity).
A spinning car is ALWAYS moving "forward" relative to its own heading,
so forward_vel > 0 always, giving positive reward while circling indefinitely.
We bypass this entirely.
FORMULA (v4):
base = 1.0 - min(abs(cte) / max_cte, 1.0) # CTE quality [0,1]
eff = net_displacement / total_path_length # Forward progress [0,1]
shaped = base × eff × (1 + speed_scale × speed) # All three must be high
On done/crash: shaped = -1.0
PROPERTIES:
- Spinning (eff≈0): shaped ≈ 0 (no reward)
- On track, slow (eff≈1): shaped ≈ base (CTE reward only)
- On track, fast (eff≈1): shaped > base (CTE + speed bonus)
- Off track (base≈0): shaped ≈ 0 (penalty via done)
- Cannot be gamed: ALL THREE terms must be high simultaneously
RESEARCH NOTE (2026-04-13):
v3 was insufficient — circling at start gave 1.0/step × 47k steps = 47k reward.
v4 makes efficiency a multiplier on the entire reward, not just the speed bonus.
See docs/RESEARCH_LOG.md for full hacking history.
"""
import gymnasium as gym
import numpy as np
from collections import deque
class SpeedRewardWrapper(gym.Wrapper):
"""
Full reward bypass: base CTE reward × path efficiency × speed bonus.
Completely ignores the sim's own reward (which uses forward_vel and is
exploitable by circular/spinning motion).
Args:
env: gymnasium environment
speed_scale: speed bonus multiplier (default 0.1)
window_size: steps for efficiency calculation (default 30)
min_efficiency: efficiency below which no reward (default 0.05)
max_cte: track half-width for normalization (default 8.0, matches sim)
"""
def __init__(
self,
env,
speed_scale: float = 0.1,
window_size: int = 60, # increased from 30 — catches slower circles
min_efficiency: float = 0.05,
max_cte: float = 8.0,
min_lap_time: float = 5.0, # laps faster than this are penalised as exploits
):
super().__init__(env)
self.speed_scale = speed_scale
self.window_size = window_size
self.min_efficiency = min_efficiency
self.max_cte = max_cte
self.min_lap_time = min_lap_time
self._pos_history = deque(maxlen=window_size + 1)
self._last_lap_count = 0 # track lap completions to detect short-lap exploit
def reset(self, **kwargs):
result = self.env.reset(**kwargs)
self._pos_history.clear()
self._last_lap_count = 0
return result
def step(self, action):
result = self.env.step(action)
# Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs
if len(result) == 5:
obs, _sim_reward, terminated, truncated, info = result
done = terminated or truncated
elif len(result) == 4:
obs, _sim_reward, done, info = result
terminated = done
truncated = False
else:
raise ValueError(f'Unexpected step() result length: {len(result)}')
# Completely ignore _sim_reward — compute our own
shaped, force_terminate = self._compute_reward_and_done(done, info)
if force_terminate:
terminated = True
done = True
if len(result) == 5:
return obs, shaped, terminated, truncated, info
else:
return obs, shaped, done, info
def _compute_reward_and_done(self, done: bool, info: dict):
"""
v5: speed × CTE-quality reward.
reward = speed × (1 - |cte| / max_cte)
Simpler than v4. Directly incentivises going FAST while staying
centred. On a hill: car slows → reward drops → clear gradient
signal to apply more throttle. v4's efficiency term gave zero
gradient when the car was stuck (all three terms collapsed to zero
simultaneously, so no direction to improve).
Exploit protection (unchanged):
- Short-lap penalty: laps < min_lap_time → large negative reward
- StuckTerminationWrapper: done=True after 80 steps of <0.5m movement
- Crash: done=True → -1.0
"""
# Crash / episode over
if done:
return -1.0, False
# --- Short-lap exploit detection (unchanged) ---
try:
current_lap_count = int(info.get('lap_count', 0) or 0)
except (TypeError, ValueError):
current_lap_count = self._last_lap_count
if current_lap_count > self._last_lap_count:
self._last_lap_count = current_lap_count
try:
lap_time = float(info.get('last_lap_time', 999.0) or 999.0)
except (TypeError, ValueError):
lap_time = 999.0
if lap_time < self.min_lap_time:
# Short-lap exploit: penalty AND terminate episode immediately.
# Penalty alone is insufficient — the model stays alive and
# keeps accumulating small rewards between laps.
# Termination removes that loophole completely.
penalty = -10.0 * (self.min_lap_time / max(lap_time, 0.1))
return penalty, True # (reward, force_terminate)
# Legitimate lap — fall through to normal reward
# --- CTE quality: how centred is the car? ---
try:
cte = float(info.get('cte', 0.0) or 0.0)
except (TypeError, ValueError):
cte = 0.0
cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0) # 0=off track, 1=centred
# --- Speed ---
try:
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
except (TypeError, ValueError):
speed = 0.0
# --- v5 reward: speed × CTE quality ---
# Fast + centred = high reward. Slow (hill) = low reward → gradient
# pushes policy toward higher throttle. Off-track = near-zero.
# Normalise speed so max reward ≈ 1.0 at reasonable speed (10 m/s).
speed_norm = min(speed / 10.0, 1.0)
return cte_quality * speed_norm, False
def _compute_efficiency(self) -> float:
"""Path efficiency = net_displacement / total_path_length."""
if len(self._pos_history) < 3:
return 1.0 # Insufficient history — give benefit of doubt
positions = list(self._pos_history)
net = np.linalg.norm(positions[-1] - positions[0])
total = sum(
np.linalg.norm(positions[i + 1] - positions[i])
for i in range(len(positions) - 1)
)
return float(net / total) if total > 1e-6 else 1.0
def theoretical_max_per_step(self, max_speed: float = 10.0) -> float:
"""Upper bound on reward/step (efficiency=1, CTE=0, max speed)."""
return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed)