""" Progress-Based Reward Wrapper for DonkeyCar RL — v3 (Anti-Circular) ==================================================================== PROBLEM HISTORY: v1 (additive): speed × (1 - cte/max_cte) → Hacked by oscillating at track boundary (trials 8+13 in corrupted data) v2 (multiplicative): original × (1 + speed_scale × speed) → Still hacked by circling ON the track (trial 5: cv=0.0%, 4582 reward) → Circular motion has low CTE + positive speed → full speed bonus → Neither CTE nor raw speed can distinguish forward vs circular motion v3 (path efficiency): original × (1 + speed_scale × speed × path_efficiency) → Path efficiency = net_displacement / path_length over sliding window → Forward driving: efficiency ≈ 1.0 (all movement is productive) → Circular driving: efficiency ≈ 0.0 (movement cancels out, no net advance) → Speed bonus disappears when circling → car incentivized to go FORWARD FORMULA: efficiency = |pos_t - pos_{t-window}| / Σ|pos_i - pos_{i-1}| = net_displacement / total_path_length shaped_reward = original_reward × (1 + speed_scale × speed × efficiency) (when original_reward ≤ 0: no bonus, just penalty — same as v2) RESEARCH NOTE (2026-04-13): Circular driving discovered in Phase 1 despite v2 fix. Trial 5: mean_reward=4582, cv=0.0% over 4787 steps. User visually confirmed: car circling at start line. See docs/RESEARCH_LOG.md for full analysis. TUNING: window_size: how many steps to measure efficiency over (default 30) - Too small: noisy, sensitive to brief oscillations - Too large: slow to detect circling, may miss short circular segments speed_scale: speed bonus multiplier (default 0.1) min_efficiency: minimum efficiency before speed bonus disappears (default 0.1) """ import gymnasium as gym import numpy as np from collections import deque class SpeedRewardWrapper(gym.Wrapper): """ Path-efficiency-gated speed reward. Speed bonus only applies proportionally to how much the car is making NET FORWARD PROGRESS. Args: env: gymnasium environment speed_scale: speed bonus multiplier (default 0.1) window_size: number of steps for efficiency measurement (default 30) min_efficiency: efficiency floor below which speed bonus is zero (default 0.05) """ def __init__(self, env, speed_scale: float = 0.1, window_size: int = 30, min_efficiency: float = 0.05): super().__init__(env) self.speed_scale = speed_scale self.window_size = window_size self.min_efficiency = min_efficiency # Sliding window of positions for efficiency calculation self._pos_history = deque(maxlen=window_size + 1) self._path_length = 0.0 def reset(self, **kwargs): result = self.env.reset(**kwargs) self._pos_history.clear() self._path_length = 0.0 return result def step(self, action): result = self.env.step(action) # Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs if len(result) == 5: obs, reward, terminated, truncated, info = result done = terminated or truncated elif len(result) == 4: obs, reward, done, info = result terminated = done truncated = False else: raise ValueError(f'Unexpected step() result length: {len(result)}') shaped = self._shape_reward(reward, info) if len(result) == 5: return obs, shaped, terminated, truncated, info else: return obs, shaped, done, info def _get_pos(self, info: dict): """Extract position from info dict. Returns None if unavailable.""" pos = info.get('pos', None) if pos is None: return None try: return np.array(pos[:3], dtype=np.float64) except (TypeError, IndexError, ValueError): return None def _compute_efficiency(self) -> float: """ Compute path efficiency = net displacement / total path length over window. Returns 1.0 if insufficient history (can't penalize yet). Returns 0.0 if no movement. """ if len(self._pos_history) < 3: return 1.0 # Not enough history, give benefit of doubt positions = list(self._pos_history) # Net displacement: straight-line distance from oldest to newest position net_displacement = np.linalg.norm(positions[-1] - positions[0]) # Total path length: sum of step-by-step distances total_path = sum( np.linalg.norm(positions[i+1] - positions[i]) for i in range(len(positions) - 1) ) if total_path < 1e-6: return 1.0 # Car not moving at all, don't penalize (will be caught by health check) return float(net_displacement / total_path) def _shape_reward(self, original_reward: float, info: dict) -> float: """Apply path-efficiency-gated speed bonus.""" # Update position history pos = self._get_pos(info) if pos is not None: self._pos_history.append(pos) # Only apply speed bonus when genuinely on track (positive CTE reward) if original_reward <= 0: return original_reward # Off track / crashed — no speed reward # Extract speed try: speed = max(0.0, float(info.get('speed', 0.0) or 0.0)) except (TypeError, ValueError): return original_reward # Compute path efficiency (detects circular motion) efficiency = self._compute_efficiency() # Clamp efficiency: below min_efficiency, no speed bonus effective_efficiency = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency)) # Multiplicative bonus: fast forward progress → full bonus, circling → zero bonus shaped = original_reward * (1.0 + self.speed_scale * speed * effective_efficiency) return shaped def theoretical_max_per_step(self, max_speed: float = 10.0) -> float: """Upper bound on reward per step (for hack detection calibration).""" return 1.0 * (1.0 + self.speed_scale * max_speed * 1.0) # efficiency=1 at best