159 lines
6.0 KiB
Python
159 lines
6.0 KiB
Python
"""
|
||
Speed + Progress Reward Wrapper for DonkeyCar RL — v4 (Full Bypass)
|
||
====================================================================
|
||
|
||
REWARD HACKING HISTORY:
|
||
v1 additive: speed × (1-cte/max_cte) → boundary oscillation
|
||
v2 multiplicative: original × (1+speed×scale) → circular driving (on-track)
|
||
v3 path efficiency: original × (1+speed×eff×scale) → still circling!
|
||
WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward.
|
||
A spinning car at CTE≈0 still earns 1.0/step × thousands of steps.
|
||
|
||
v4 (THIS VERSION): Completely bypass sim's reward. Multiply base reward by
|
||
efficiency so circling yields ZERO reward regardless of CTE.
|
||
|
||
ROOT CAUSE OF CIRCLING:
|
||
The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity).
|
||
A spinning car is ALWAYS moving "forward" relative to its own heading,
|
||
so forward_vel > 0 always, giving positive reward while circling indefinitely.
|
||
We bypass this entirely.
|
||
|
||
FORMULA (v4):
|
||
base = 1.0 - min(abs(cte) / max_cte, 1.0) # CTE quality [0,1]
|
||
eff = net_displacement / total_path_length # Forward progress [0,1]
|
||
shaped = base × eff × (1 + speed_scale × speed) # All three must be high
|
||
|
||
On done/crash: shaped = -1.0
|
||
|
||
PROPERTIES:
|
||
- Spinning (eff≈0): shaped ≈ 0 (no reward)
|
||
- On track, slow (eff≈1): shaped ≈ base (CTE reward only)
|
||
- On track, fast (eff≈1): shaped > base (CTE + speed bonus)
|
||
- Off track (base≈0): shaped ≈ 0 (penalty via done)
|
||
- Cannot be gamed: ALL THREE terms must be high simultaneously
|
||
|
||
RESEARCH NOTE (2026-04-13):
|
||
v3 was insufficient — circling at start gave 1.0/step × 47k steps = 47k reward.
|
||
v4 makes efficiency a multiplier on the entire reward, not just the speed bonus.
|
||
See docs/RESEARCH_LOG.md for full hacking history.
|
||
"""
|
||
|
||
import gymnasium as gym
|
||
import numpy as np
|
||
from collections import deque
|
||
|
||
|
||
class SpeedRewardWrapper(gym.Wrapper):
|
||
"""
|
||
Full reward bypass: base CTE reward × path efficiency × speed bonus.
|
||
|
||
Completely ignores the sim's own reward (which uses forward_vel and is
|
||
exploitable by circular/spinning motion).
|
||
|
||
Args:
|
||
env: gymnasium environment
|
||
speed_scale: speed bonus multiplier (default 0.1)
|
||
window_size: steps for efficiency calculation (default 30)
|
||
min_efficiency: efficiency below which no reward (default 0.05)
|
||
max_cte: track half-width for normalization (default 8.0, matches sim)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
env,
|
||
speed_scale: float = 0.1,
|
||
window_size: int = 30,
|
||
min_efficiency: float = 0.05,
|
||
max_cte: float = 8.0,
|
||
):
|
||
super().__init__(env)
|
||
self.speed_scale = speed_scale
|
||
self.window_size = window_size
|
||
self.min_efficiency = min_efficiency
|
||
self.max_cte = max_cte
|
||
self._pos_history = deque(maxlen=window_size + 1)
|
||
|
||
def reset(self, **kwargs):
|
||
result = self.env.reset(**kwargs)
|
||
self._pos_history.clear()
|
||
return result
|
||
|
||
def step(self, action):
|
||
result = self.env.step(action)
|
||
|
||
# Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs
|
||
if len(result) == 5:
|
||
obs, _sim_reward, terminated, truncated, info = result
|
||
done = terminated or truncated
|
||
elif len(result) == 4:
|
||
obs, _sim_reward, done, info = result
|
||
terminated = done
|
||
truncated = False
|
||
else:
|
||
raise ValueError(f'Unexpected step() result length: {len(result)}')
|
||
|
||
# Completely ignore _sim_reward — compute our own
|
||
shaped = self._compute_reward(done, info)
|
||
|
||
if len(result) == 5:
|
||
return obs, shaped, terminated, truncated, info
|
||
else:
|
||
return obs, shaped, done, info
|
||
|
||
def _compute_reward(self, done: bool, info: dict) -> float:
|
||
"""
|
||
Compute reward from scratch using CTE × efficiency × speed.
|
||
Bypasses sim's exploitable forward_vel-based reward.
|
||
"""
|
||
# Crash / episode over
|
||
if done:
|
||
return -1.0
|
||
|
||
# Update position history
|
||
pos = info.get('pos', None)
|
||
if pos is not None:
|
||
try:
|
||
self._pos_history.append(np.array(list(pos)[:3], dtype=np.float64))
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
# --- Base reward: purely CTE-based ---
|
||
try:
|
||
cte = float(info.get('cte', 0.0) or 0.0)
|
||
except (TypeError, ValueError):
|
||
cte = 0.0
|
||
base = 1.0 - min(abs(cte) / self.max_cte, 1.0)
|
||
|
||
# --- Path efficiency: detects circular motion ---
|
||
efficiency = self._compute_efficiency()
|
||
# Clamp: below min_efficiency → zero bonus
|
||
eff = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency))
|
||
|
||
# --- Speed: from info dict ---
|
||
try:
|
||
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
|
||
except (TypeError, ValueError):
|
||
speed = 0.0
|
||
|
||
# --- Combined reward: ALL three terms must be high ---
|
||
# Circling: eff≈0 → reward≈0 regardless of CTE or speed
|
||
shaped = base * eff * (1.0 + self.speed_scale * speed)
|
||
return shaped
|
||
|
||
def _compute_efficiency(self) -> float:
|
||
"""Path efficiency = net_displacement / total_path_length."""
|
||
if len(self._pos_history) < 3:
|
||
return 1.0 # Insufficient history — give benefit of doubt
|
||
|
||
positions = list(self._pos_history)
|
||
net = np.linalg.norm(positions[-1] - positions[0])
|
||
total = sum(
|
||
np.linalg.norm(positions[i + 1] - positions[i])
|
||
for i in range(len(positions) - 1)
|
||
)
|
||
return float(net / total) if total > 1e-6 else 1.0
|
||
|
||
def theoretical_max_per_step(self, max_speed: float = 10.0) -> float:
|
||
"""Upper bound on reward/step (efficiency=1, CTE=0, max speed)."""
|
||
return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed)
|