donkeycar-rl-autoresearch/agent/reward_wrapper.py

188 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Speed + Progress Reward Wrapper for DonkeyCar RL — v4 (Full Bypass)
====================================================================
REWARD HACKING HISTORY:
v1 additive: speed × (1-cte/max_cte) → boundary oscillation
v2 multiplicative: original × (1+speed×scale) → circular driving (on-track)
v3 path efficiency: original × (1+speed×eff×scale) → still circling!
WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward.
A spinning car at CTE≈0 still earns 1.0/step × thousands of steps.
v4 (THIS VERSION): Completely bypass sim's reward. Multiply base reward by
efficiency so circling yields ZERO reward regardless of CTE.
ROOT CAUSE OF CIRCLING:
The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity).
A spinning car is ALWAYS moving "forward" relative to its own heading,
so forward_vel > 0 always, giving positive reward while circling indefinitely.
We bypass this entirely.
FORMULA (v4):
base = 1.0 - min(abs(cte) / max_cte, 1.0) # CTE quality [0,1]
eff = net_displacement / total_path_length # Forward progress [0,1]
shaped = base × eff × (1 + speed_scale × speed) # All three must be high
On done/crash: shaped = -1.0
PROPERTIES:
- Spinning (eff≈0): shaped ≈ 0 (no reward)
- On track, slow (eff≈1): shaped ≈ base (CTE reward only)
- On track, fast (eff≈1): shaped > base (CTE + speed bonus)
- Off track (base≈0): shaped ≈ 0 (penalty via done)
- Cannot be gamed: ALL THREE terms must be high simultaneously
RESEARCH NOTE (2026-04-13):
v3 was insufficient — circling at start gave 1.0/step × 47k steps = 47k reward.
v4 makes efficiency a multiplier on the entire reward, not just the speed bonus.
See docs/RESEARCH_LOG.md for full hacking history.
"""
import gymnasium as gym
import numpy as np
from collections import deque
class SpeedRewardWrapper(gym.Wrapper):
"""
Full reward bypass: base CTE reward × path efficiency × speed bonus.
Completely ignores the sim's own reward (which uses forward_vel and is
exploitable by circular/spinning motion).
Args:
env: gymnasium environment
speed_scale: speed bonus multiplier (default 0.1)
window_size: steps for efficiency calculation (default 30)
min_efficiency: efficiency below which no reward (default 0.05)
max_cte: track half-width for normalization (default 8.0, matches sim)
"""
def __init__(
self,
env,
speed_scale: float = 0.1,
window_size: int = 60, # increased from 30 — catches slower circles
min_efficiency: float = 0.05,
max_cte: float = 8.0,
min_lap_time: float = 5.0, # laps faster than this are penalised as exploits
):
super().__init__(env)
self.speed_scale = speed_scale
self.window_size = window_size
self.min_efficiency = min_efficiency
self.max_cte = max_cte
self.min_lap_time = min_lap_time
self._pos_history = deque(maxlen=window_size + 1)
self._last_lap_count = 0 # track lap completions to detect short-lap exploit
def reset(self, **kwargs):
result = self.env.reset(**kwargs)
self._pos_history.clear()
self._last_lap_count = 0
return result
def step(self, action):
result = self.env.step(action)
# Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs
if len(result) == 5:
obs, _sim_reward, terminated, truncated, info = result
done = terminated or truncated
elif len(result) == 4:
obs, _sim_reward, done, info = result
terminated = done
truncated = False
else:
raise ValueError(f'Unexpected step() result length: {len(result)}')
# Completely ignore _sim_reward — compute our own
shaped = self._compute_reward(done, info)
if len(result) == 5:
return obs, shaped, terminated, truncated, info
else:
return obs, shaped, done, info
def _compute_reward(self, done: bool, info: dict) -> float:
"""
Compute reward from scratch using CTE × efficiency × speed.
Bypasses sim's exploitable forward_vel-based reward.
Exploit patches
---------------
Short-lap circle: model circles at start/finish line triggering
lap completions every 1-2 sim-seconds. Detected via lap_count
increment + last_lap_time < min_lap_time → large penalty.
"""
# Crash / episode over
if done:
return -1.0
# --- Short-lap exploit detection ---
# Fires exactly once per lap completion, only when the lap was too fast.
try:
current_lap_count = int(info.get('lap_count', 0) or 0)
except (TypeError, ValueError):
current_lap_count = self._last_lap_count
if current_lap_count > self._last_lap_count:
# A new lap just completed
self._last_lap_count = current_lap_count
try:
lap_time = float(info.get('last_lap_time', 999.0) or 999.0)
except (TypeError, ValueError):
lap_time = 999.0
if lap_time < self.min_lap_time:
# Tiny-circle exploit — heavy penalty proportional to how short the lap was
return -10.0 * (self.min_lap_time / max(lap_time, 0.1))
# Legitimate lap — no penalty, fall through to normal reward
# Update position history
pos = info.get('pos', None)
if pos is not None:
try:
self._pos_history.append(np.array(list(pos)[:3], dtype=np.float64))
except (TypeError, ValueError):
pass
# --- Base reward: purely CTE-based ---
try:
cte = float(info.get('cte', 0.0) or 0.0)
except (TypeError, ValueError):
cte = 0.0
base = 1.0 - min(abs(cte) / self.max_cte, 1.0)
# --- Path efficiency: detects circular motion ---
efficiency = self._compute_efficiency()
# Clamp: below min_efficiency → zero bonus
eff = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency))
# --- Speed: from info dict ---
try:
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
except (TypeError, ValueError):
speed = 0.0
# --- Combined reward: ALL three terms must be high ---
# Circling: eff≈0 → reward≈0 regardless of CTE or speed
shaped = base * eff * (1.0 + self.speed_scale * speed)
return shaped
def _compute_efficiency(self) -> float:
"""Path efficiency = net_displacement / total_path_length."""
if len(self._pos_history) < 3:
return 1.0 # Insufficient history — give benefit of doubt
positions = list(self._pos_history)
net = np.linalg.norm(positions[-1] - positions[0])
total = sum(
np.linalg.norm(positions[i + 1] - positions[i])
for i in range(len(positions) - 1)
)
return float(net / total) if total > 1e-6 else 1.0
def theoretical_max_per_step(self, max_speed: float = 10.0) -> float:
"""Upper bound on reward/step (efficiency=1, CTE=0, max speed)."""
return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed)