""" Speed + Progress Reward Wrapper for DonkeyCar RL — v4 (Full Bypass) ==================================================================== REWARD HACKING HISTORY: v1 additive: speed × (1-cte/max_cte) → boundary oscillation v2 multiplicative: original × (1+speed×scale) → circular driving (on-track) v3 path efficiency: original × (1+speed×eff×scale) → still circling! WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward. A spinning car at CTE≈0 still earns 1.0/step × thousands of steps. v4 (THIS VERSION): Completely bypass sim's reward. Multiply base reward by efficiency so circling yields ZERO reward regardless of CTE. ROOT CAUSE OF CIRCLING: The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity). A spinning car is ALWAYS moving "forward" relative to its own heading, so forward_vel > 0 always, giving positive reward while circling indefinitely. We bypass this entirely. FORMULA (v4): base = 1.0 - min(abs(cte) / max_cte, 1.0) # CTE quality [0,1] eff = net_displacement / total_path_length # Forward progress [0,1] shaped = base × eff × (1 + speed_scale × speed) # All three must be high On done/crash: shaped = -1.0 PROPERTIES: - Spinning (eff≈0): shaped ≈ 0 (no reward) - On track, slow (eff≈1): shaped ≈ base (CTE reward only) - On track, fast (eff≈1): shaped > base (CTE + speed bonus) - Off track (base≈0): shaped ≈ 0 (penalty via done) - Cannot be gamed: ALL THREE terms must be high simultaneously RESEARCH NOTE (2026-04-13): v3 was insufficient — circling at start gave 1.0/step × 47k steps = 47k reward. v4 makes efficiency a multiplier on the entire reward, not just the speed bonus. See docs/RESEARCH_LOG.md for full hacking history. """ import gymnasium as gym import numpy as np from collections import deque class SpeedRewardWrapper(gym.Wrapper): """ Full reward bypass: base CTE reward × path efficiency × speed bonus. Completely ignores the sim's own reward (which uses forward_vel and is exploitable by circular/spinning motion). Args: env: gymnasium environment speed_scale: speed bonus multiplier (default 0.1) window_size: steps for efficiency calculation (default 30) min_efficiency: efficiency below which no reward (default 0.05) max_cte: track half-width for normalization (default 8.0, matches sim) """ def __init__( self, env, speed_scale: float = 0.1, window_size: int = 30, min_efficiency: float = 0.05, max_cte: float = 8.0, ): super().__init__(env) self.speed_scale = speed_scale self.window_size = window_size self.min_efficiency = min_efficiency self.max_cte = max_cte self._pos_history = deque(maxlen=window_size + 1) def reset(self, **kwargs): result = self.env.reset(**kwargs) self._pos_history.clear() return result def step(self, action): result = self.env.step(action) # Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs if len(result) == 5: obs, _sim_reward, terminated, truncated, info = result done = terminated or truncated elif len(result) == 4: obs, _sim_reward, done, info = result terminated = done truncated = False else: raise ValueError(f'Unexpected step() result length: {len(result)}') # Completely ignore _sim_reward — compute our own shaped = self._compute_reward(done, info) if len(result) == 5: return obs, shaped, terminated, truncated, info else: return obs, shaped, done, info def _compute_reward(self, done: bool, info: dict) -> float: """ Compute reward from scratch using CTE × efficiency × speed. Bypasses sim's exploitable forward_vel-based reward. """ # Crash / episode over if done: return -1.0 # Update position history pos = info.get('pos', None) if pos is not None: try: self._pos_history.append(np.array(list(pos)[:3], dtype=np.float64)) except (TypeError, ValueError): pass # --- Base reward: purely CTE-based --- try: cte = float(info.get('cte', 0.0) or 0.0) except (TypeError, ValueError): cte = 0.0 base = 1.0 - min(abs(cte) / self.max_cte, 1.0) # --- Path efficiency: detects circular motion --- efficiency = self._compute_efficiency() # Clamp: below min_efficiency → zero bonus eff = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency)) # --- Speed: from info dict --- try: speed = max(0.0, float(info.get('speed', 0.0) or 0.0)) except (TypeError, ValueError): speed = 0.0 # --- Combined reward: ALL three terms must be high --- # Circling: eff≈0 → reward≈0 regardless of CTE or speed shaped = base * eff * (1.0 + self.speed_scale * speed) return shaped def _compute_efficiency(self) -> float: """Path efficiency = net_displacement / total_path_length.""" if len(self._pos_history) < 3: return 1.0 # Insufficient history — give benefit of doubt positions = list(self._pos_history) net = np.linalg.norm(positions[-1] - positions[0]) total = sum( np.linalg.norm(positions[i + 1] - positions[i]) for i in range(len(positions) - 1) ) return float(net / total) if total > 1e-6 else 1.0 def theoretical_max_per_step(self, max_speed: float = 10.0) -> float: """Upper bound on reward/step (efficiency=1, CTE=0, max speed).""" return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed)