""" Speed-Aware Reward Wrapper for DonkeyCar RL — v2 (Hack-Proof) ============================================================== DESIGN PRINCIPLE: Speed should only be rewarded when the car is genuinely progressing down the track. The original DonkeyCar reward already correctly signals track presence — we build on top of it. FORMULA: if original_reward > 0 (car is on track and centered): shaped = original_reward × (1 + speed_scale × speed) else (car is off track / crashed): shaped = original_reward (no speed bonus — cannot be hacked) WHY THIS IS HACK-PROOF: The previous formula (speed × (1 - cte/max_cte)) could be maximized by oscillating at the track boundary — the model learned this in practice. The multiplicative formula is bounded by the original DonkeyCar reward: - Off track → original_reward ≤ 0 → no speed multiplier possible - The model CANNOT increase reward by going fast off-track - Speed bonus only accumulates when genuinely driving on the track RESEARCH NOTE (2026-04-13): The additive formula caused reward hacking in Phase 1 — trials 8 and 13 achieved mean_reward=1936 and 1139 respectively by oscillating at the track boundary. This design was developed to prevent that exploit. See docs/RESEARCH_LOG.md for full details. TUNING: speed_scale=0.1 means a car going 5 m/s gets a 50% bonus on top of the base CTE reward. This is a meaningful but not overwhelming incentive. Increase to 0.3+ to prioritize speed more aggressively (Phase 3). """ import gymnasium as gym import numpy as np class SpeedRewardWrapper(gym.Wrapper): """ Hack-proof speed reward: multiplicative bonus ONLY when on track. Args: env: gymnasium environment speed_scale: multiplier for speed bonus (default 0.1) shaped = original × (1 + speed_scale × speed) when on track shaped = original when off track """ def __init__(self, env, speed_scale: float = 0.1): super().__init__(env) self.speed_scale = speed_scale def step(self, action): result = self.env.step(action) # Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs if len(result) == 5: obs, reward, terminated, truncated, info = result done = terminated or truncated elif len(result) == 4: obs, reward, done, info = result terminated = done truncated = False else: raise ValueError(f'Unexpected step() result length: {len(result)}') shaped = self._shape_reward(reward, info) if len(result) == 5: return obs, shaped, terminated, truncated, info else: return obs, shaped, done, info def _shape_reward(self, original_reward: float, info: dict) -> float: """ Multiplicative speed bonus — only when on track. Falls back gracefully if speed not in info dict. """ # Only apply speed bonus when genuinely on track (positive CTE reward) if original_reward <= 0: return original_reward # Off track / crashed — no speed reward # Extract speed from info dict try: speed = float(info.get('speed', 0.0)) if speed is None: return original_reward speed = max(0.0, speed) # No negative speed bonus except (TypeError, ValueError): return original_reward # Graceful fallback # Multiplicative bonus: reward grows with speed, but only on track # Hack-proof: cannot increase by going fast off-track shaped = original_reward * (1.0 + self.speed_scale * speed) return shaped def theoretical_max_per_step(self, max_speed: float = 10.0) -> float: """Returns the theoretical max reward per step for bounds checking.""" # original_reward ≤ 1.0, so shaped ≤ 1.0 × (1 + speed_scale × max_speed) return 1.0 * (1.0 + self.speed_scale * max_speed)