""" Speed + Progress Reward Wrapper for DonkeyCar RL — v6 (Speed×CTE + Efficiency Gate) ===================================================================================== REWARD HACKING HISTORY: v1 additive: speed × (1-cte/max_cte) → boundary oscillation v2 multiplicative: original × (1+speed×scale) → circular driving (on-track) v3 path efficiency: original × (1+speed×eff×scale) → still circling! WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward. A spinning car at CTE≈0 still earns 1.0/step × thousands of steps. v4: base × eff × (1 + speed_scale × speed) → zero gradient on hills! WHY v4 failed on hills: speed≈0 AND eff≈0 AND cte_quality varies → all three terms near zero simultaneously → no gradient to push ANY term up. v5: speed × CTE_quality (no efficiency) → circular driving returns! WHY v5 failed: dropped efficiency entirely. Circular driving at CTE≈0 with speed>0 earns positive reward indefinitely. Observed in Exp 11. v6 (THIS VERSION): v5 reward + efficiency GATE. Keeps v5's gradient properties (non-zero gradient on hills) but adds a binary efficiency check that zeros reward when car is circling. ROOT CAUSE OF CIRCLING: The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity). A spinning car is ALWAYS moving "forward" relative to its own heading, so forward_vel > 0 always, giving positive reward while circling indefinitely. We bypass this entirely. FORMULA (v6): cte_quality = 1.0 - min(|cte| / max_cte, 1.0) # [0,1] centred=1 speed_norm = min(speed / 10.0, 1.0) # [0,1] normalised efficiency = net_displacement / total_path # [0,1] straight=1, circle=0 if efficiency < min_efficiency: reward = 0.0 # GATE: circling → zero reward (but not negative) else: reward = cte_quality × speed_norm # v5 formula (gradient on hills) On done/crash: reward = -1.0 WHY GATE NOT MULTIPLIER: v4 used efficiency as a multiplier: reward = base × eff × speed_bonus. On a hill: speed≈0, eff≈0, base≈0.5 → reward≈0 and ∂reward/∂speed≈0. No gradient to push speed up — car stays stuck. v6 gate: efficiency is either PASS or FAIL. When efficiency > threshold (car moving forward at all), reward = speed × CTE_quality. On a hill: car is stuck but still has eff > 0 (not literally circling), so the gate passes and the reward = speed × CTE_quality. ∂reward/∂speed > 0 → gradient pushes toward more throttle. Circle has eff ≈ 0 → gate fails → reward = 0. PROPERTIES: - Circling (eff0): reward = speed × CTE (gradient toward unstuck) - On track, fast: reward = high (speed + centred) - Off track: reward ≈ 0 (CTE_quality → 0) - Crash: reward = -1.0 """ import gymnasium as gym import numpy as np from collections import deque class SpeedRewardWrapper(gym.Wrapper): """ Full reward bypass: speed × CTE_quality, gated by efficiency. Completely ignores the sim's own reward (which uses forward_vel and is exploitable by circular/spinning motion). Args: env: gymnasium environment speed_scale: speed bonus multiplier (default 0.1) window_size: steps for efficiency calculation (default 30) min_efficiency: efficiency below which no reward (default 0.15) max_cte: track half-width for normalization (default 8.0) min_lap_time: laps faster than this are penalised as exploits max_cte_terminate: terminate if CTE exceeds this for cte_patience steps cte_patience: steps of sustained high CTE before termination (default 20) min_progress_steps: steps before checking track progress (allow settling) progress_patience: steps of zero track progress before termination (default 60) """ def __init__( self, env, speed_scale: float = 0.1, window_size: int = 30, min_efficiency: float = 0.15, max_cte: float = 8.0, min_lap_time: float = 5.0, max_cte_terminate: float = 4.0, # terminate early if CTE sustained > 4m cte_patience: int = 20, # steps of high CTE before terminate progress_patience: int = 60, # steps of no track progress before terminate ): super().__init__(env) self.speed_scale = speed_scale self.window_size = window_size self.min_efficiency = min_efficiency self.max_cte = max_cte self.min_lap_time = min_lap_time self.max_cte_terminate = max_cte_terminate self.cte_patience = cte_patience self.progress_patience = progress_patience self._pos_history = deque(maxlen=window_size + 1) self._last_lap_count = 0 self._high_cte_steps = 0 # consecutive steps with CTE > max_cte_terminate self._last_active_node = -1 # track progress node at last check self._no_progress_steps = 0 # consecutive steps with no node advancement def reset(self, **kwargs): result = self.env.reset(**kwargs) self._pos_history.clear() self._last_lap_count = 0 self._high_cte_steps = 0 self._last_active_node = -1 self._no_progress_steps = 0 return result def step(self, action): result = self.env.step(action) # Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs if len(result) == 5: obs, _sim_reward, terminated, truncated, info = result done = terminated or truncated elif len(result) == 4: obs, _sim_reward, done, info = result terminated = done truncated = False else: raise ValueError(f'Unexpected step() result length: {len(result)}') # Completely ignore _sim_reward — compute our own shaped, force_terminate = self._compute_reward_and_done(done, info) if force_terminate: terminated = True done = True if len(result) == 5: return obs, shaped, terminated, truncated, info else: return obs, shaped, done, info def _compute_reward_and_done(self, done: bool, info: dict): """ v6.1: speed × CTE-quality + efficiency gate + grass/rollback terminators. New termination conditions: - Sustained high CTE: CTE > max_cte_terminate for cte_patience steps → terminate. Stops the grass exploit (car exits track gap and drives indefinitely on grass with CTE just under max_cte=8.0). - No track progress: active_node doesn't advance for progress_patience steps → terminate. Stops mountain rollback (car goes up, rolls back, IS moving so StuckWrapper doesn't fire, but never advances). reward = speed_norm × cte_quality (when efficiency >= threshold) reward = 0.0 (when circling) reward = -1.0 (on crash/termination) """ # Track position for efficiency calculation try: pos = info.get('pos', (0.0, 0.0, 0.0)) pos_x = float(pos[0]) pos_z = float(pos[2]) self._pos_history.append(np.array([pos_x, pos_z])) except (TypeError, ValueError, IndexError): pass # Crash / episode over if done: return -1.0, False # --- CTE value for all checks --- try: cte = float(info.get('cte', 0.0) or 0.0) except (TypeError, ValueError): cte = 0.0 # --- Grass exploit: sustained high CTE termination --- if abs(cte) > self.max_cte_terminate: self._high_cte_steps += 1 if self._high_cte_steps >= self.cte_patience: return -1.0, True # too long off-track — terminate else: self._high_cte_steps = 0 # --- Mountain rollback: no track progress termination --- try: active_node = int(info.get('active_node', -1) or -1) except (TypeError, ValueError): active_node = -1 if active_node >= 0: if active_node == self._last_active_node: self._no_progress_steps += 1 if self._no_progress_steps >= self.progress_patience: return -1.0, True # no track progress — terminate else: self._last_active_node = active_node self._no_progress_steps = 0 # --- Short-lap exploit detection --- try: current_lap_count = int(info.get('lap_count', 0) or 0) except (TypeError, ValueError): current_lap_count = self._last_lap_count if current_lap_count > self._last_lap_count: self._last_lap_count = current_lap_count try: lap_time = float(info.get('last_lap_time', 999.0) or 999.0) except (TypeError, ValueError): lap_time = 999.0 if lap_time < self.min_lap_time: penalty = -10.0 * (self.min_lap_time / max(lap_time, 0.1)) return penalty, True # --- Efficiency gate: detect circular driving --- efficiency = self._compute_efficiency() if efficiency < self.min_efficiency: return 0.0, False # --- CTE quality --- cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0) # --- Speed --- try: speed = max(0.0, float(info.get('speed', 0.0) or 0.0)) except (TypeError, ValueError): speed = 0.0 # --- v6 reward: speed × CTE quality --- speed_norm = min(speed / 10.0, 1.0) return cte_quality * speed_norm, False def _compute_efficiency(self) -> float: """Path efficiency = net_displacement / total_path_length.""" if len(self._pos_history) < 3: return 1.0 # Insufficient history — give benefit of doubt positions = list(self._pos_history) net = np.linalg.norm(positions[-1] - positions[0]) total = sum( np.linalg.norm(positions[i + 1] - positions[i]) for i in range(len(positions) - 1) ) return float(net / total) if total > 1e-6 else 1.0 def theoretical_max_per_step(self, max_speed: float = 10.0) -> float: """Upper bound on reward/step (efficiency=1, CTE=0, max speed).""" return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed)