""" Speed + Progress Reward Wrapper for DonkeyCar RL — v6 (Speed×CTE + Efficiency Gate) ===================================================================================== REWARD HACKING HISTORY: v1 additive: speed × (1-cte/max_cte) → boundary oscillation v2 multiplicative: original × (1+speed×scale) → circular driving (on-track) v3 path efficiency: original × (1+speed×eff×scale) → still circling! WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward. A spinning car at CTE≈0 still earns 1.0/step × thousands of steps. v4: base × eff × (1 + speed_scale × speed) → zero gradient on hills! WHY v4 failed on hills: speed≈0 AND eff≈0 AND cte_quality varies → all three terms near zero simultaneously → no gradient to push ANY term up. v5: speed × CTE_quality (no efficiency) → circular driving returns! WHY v5 failed: dropped efficiency entirely. Circular driving at CTE≈0 with speed>0 earns positive reward indefinitely. Observed in Exp 11. v6 (THIS VERSION): v5 reward + efficiency GATE. Keeps v5's gradient properties (non-zero gradient on hills) but adds a binary efficiency check that zeros reward when car is circling. ROOT CAUSE OF CIRCLING: The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity). A spinning car is ALWAYS moving "forward" relative to its own heading, so forward_vel > 0 always, giving positive reward while circling indefinitely. We bypass this entirely. FORMULA (v6): cte_quality = 1.0 - min(|cte| / max_cte, 1.0) # [0,1] centred=1 speed_norm = min(speed / 10.0, 1.0) # [0,1] normalised efficiency = net_displacement / total_path # [0,1] straight=1, circle=0 if efficiency < min_efficiency: reward = 0.0 # GATE: circling → zero reward (but not negative) else: reward = cte_quality × speed_norm # v5 formula (gradient on hills) On done/crash: reward = -1.0 WHY GATE NOT MULTIPLIER: v4 used efficiency as a multiplier: reward = base × eff × speed_bonus. On a hill: speed≈0, eff≈0, base≈0.5 → reward≈0 and ∂reward/∂speed≈0. No gradient to push speed up — car stays stuck. v6 gate: efficiency is either PASS or FAIL. When efficiency > threshold (car moving forward at all), reward = speed × CTE_quality. On a hill: car is stuck but still has eff > 0 (not literally circling), so the gate passes and the reward = speed × CTE_quality. ∂reward/∂speed > 0 → gradient pushes toward more throttle. Circle has eff ≈ 0 → gate fails → reward = 0. PROPERTIES: - Circling (eff0): reward = speed × CTE (gradient toward unstuck) - On track, fast: reward = high (speed + centred) - Off track: reward ≈ 0 (CTE_quality → 0) - Crash: reward = -1.0 """ import gymnasium as gym import numpy as np from collections import deque class SpeedRewardWrapper(gym.Wrapper): """ Full reward bypass: base CTE reward × path efficiency × speed bonus. Completely ignores the sim's own reward (which uses forward_vel and is exploitable by circular/spinning motion). Args: env: gymnasium environment speed_scale: speed bonus multiplier (default 0.1) window_size: steps for efficiency calculation (default 30) min_efficiency: efficiency below which no reward (default 0.05) max_cte: track half-width for normalization (default 8.0, matches sim) """ def __init__( self, env, speed_scale: float = 0.1, window_size: int = 30, # captures 2+ full circles at typical circling speed min_efficiency: float = 0.15, # gate threshold: circles ≈ 0.13, wobbly straight ≈ 0.98 max_cte: float = 8.0, min_lap_time: float = 5.0, # laps faster than this are penalised as exploits ): super().__init__(env) self.speed_scale = speed_scale self.window_size = window_size self.min_efficiency = min_efficiency self.max_cte = max_cte self.min_lap_time = min_lap_time self._pos_history = deque(maxlen=window_size + 1) self._last_lap_count = 0 # track lap completions to detect short-lap exploit def reset(self, **kwargs): result = self.env.reset(**kwargs) self._pos_history.clear() self._last_lap_count = 0 return result def step(self, action): result = self.env.step(action) # Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs if len(result) == 5: obs, _sim_reward, terminated, truncated, info = result done = terminated or truncated elif len(result) == 4: obs, _sim_reward, done, info = result terminated = done truncated = False else: raise ValueError(f'Unexpected step() result length: {len(result)}') # Completely ignore _sim_reward — compute our own shaped, force_terminate = self._compute_reward_and_done(done, info) if force_terminate: terminated = True done = True if len(result) == 5: return obs, shaped, terminated, truncated, info else: return obs, shaped, done, info def _compute_reward_and_done(self, done: bool, info: dict): """ v6: speed × CTE-quality + efficiency gate. reward = speed_norm × cte_quality (when efficiency >= threshold) reward = 0.0 (when efficiency < threshold — circling) reward = -1.0 (on crash/done) The efficiency gate prevents circular driving (eff≈0 for circles) without killing gradient on hills (eff>0 for a stuck-but-not-circling car, so the gate passes and speed×CTE gradient pushes toward unstuck). Exploit protection: - Efficiency gate: circles → reward = 0 - Short-lap penalty: laps < min_lap_time → large negative + terminate - StuckTerminationWrapper: done=True after stuck_steps of no movement - Crash: done=True → -1.0 """ # Track position for efficiency calculation try: pos = info.get('pos', (0.0, 0.0, 0.0)) pos_x = float(pos[0]) pos_z = float(pos[2]) # z is forward in Unity coordinate system self._pos_history.append(np.array([pos_x, pos_z])) except (TypeError, ValueError, IndexError): pass # Crash / episode over if done: return -1.0, False # --- Short-lap exploit detection --- try: current_lap_count = int(info.get('lap_count', 0) or 0) except (TypeError, ValueError): current_lap_count = self._last_lap_count if current_lap_count > self._last_lap_count: self._last_lap_count = current_lap_count try: lap_time = float(info.get('last_lap_time', 999.0) or 999.0) except (TypeError, ValueError): lap_time = 999.0 if lap_time < self.min_lap_time: penalty = -10.0 * (self.min_lap_time / max(lap_time, 0.1)) return penalty, True # (reward, force_terminate) # --- Efficiency gate: detect circular driving --- efficiency = self._compute_efficiency() if efficiency < self.min_efficiency: # Car is circling — zero reward but don't terminate. # Zero (not negative) so there's no perverse incentive to crash # early to avoid accumulating penalties. return 0.0, False # --- CTE quality: how centred is the car? --- try: cte = float(info.get('cte', 0.0) or 0.0) except (TypeError, ValueError): cte = 0.0 cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0) # 0=off track, 1=centred # --- Speed --- try: speed = max(0.0, float(info.get('speed', 0.0) or 0.0)) except (TypeError, ValueError): speed = 0.0 # --- v6 reward: speed × CTE quality (same as v5, but gated) --- speed_norm = min(speed / 10.0, 1.0) return cte_quality * speed_norm, False def _compute_efficiency(self) -> float: """Path efficiency = net_displacement / total_path_length.""" if len(self._pos_history) < 3: return 1.0 # Insufficient history — give benefit of doubt positions = list(self._pos_history) net = np.linalg.norm(positions[-1] - positions[0]) total = sum( np.linalg.norm(positions[i + 1] - positions[i]) for i in range(len(positions) - 1) ) return float(net / total) if total > 1e-6 else 1.0 def theoretical_max_per_step(self, max_speed: float = 10.0) -> float: """Upper bound on reward/step (efficiency=1, CTE=0, max speed).""" return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed)