""" Speed × CTE Reward Wrapper for DonkeyCar RL — v7 (Clean) ========================================================= The simulator now uses solid BoxCollider barriers with Continuous Collision Detection on the car Rigidbody. The car physically cannot escape the track. This removes the need for every Python-side exploit patch that lived here: REMOVED (simulator now enforces these physically): - CTE-patience termination (car can't get far off track anyway) - High-CTE negative reward patch - solid_hit / barrier-contact monitoring - low-speed / wedge detection KEPT (still needed — physics can't detect these): - Efficiency gate: zero reward when circling (car on-track but spinning in circles, not advancing) - No-progress termination: active_node not advancing (car stuck at waypoint, not completing the course) - Lap exploit check: super-fast laps are physically impossible but kept as a sanity guard FORMULA: cte_quality = 1.0 - min(|cte| / max_cte, 1.0) # [0,1]: centred=1 speed_norm = min(speed / 10.0, 1.0) # [0,1]: normalised efficiency = net_displacement / total_path # [0,1]: straight=1, circle=0 if efficiency < min_efficiency: reward = 0.0 # circling — no incentive else: reward = cte_quality × speed_norm On done/crash: reward = -1.0 """ import gymnasium as gym import numpy as np from collections import deque class SpeedRewardWrapper(gym.Wrapper): """ Reward = speed × CTE_quality, gated by path efficiency. Args: env: gymnasium environment window_size: steps for efficiency gate history (default 30) min_efficiency: efficiency threshold — below this, reward = 0 (default 0.15) max_cte: CTE at which reward reaches 0 (default 8.0) min_lap_time: laps faster than this are penalised (exploit guard) progress_patience: steps without new max active_node before termination """ def __init__( self, env, window_size: int = 30, min_efficiency: float = 0.15, max_cte: float = 8.0, min_lap_time: float = 5.0, progress_patience: int = 60, ): super().__init__(env) self.window_size = window_size self.min_efficiency = min_efficiency self.max_cte = max_cte self.min_lap_time = min_lap_time self.progress_patience = progress_patience self._pos_history = deque(maxlen=window_size + 1) self._last_lap_count = 0 self._max_node_seen = -1 self._no_progress_steps = 0 def reset(self, **kwargs): result = self.env.reset(**kwargs) self._pos_history.clear() self._last_lap_count = 0 self._max_node_seen = -1 self._no_progress_steps = 0 return result def step(self, action): result = self.env.step(action) if len(result) == 5: obs, _sim_reward, terminated, truncated, info = result done = terminated or truncated elif len(result) == 4: obs, _sim_reward, done, info = result terminated = done truncated = False else: raise ValueError(f'Unexpected step() result length: {len(result)}') shaped, force_terminate = self._compute_reward(done, info) if force_terminate: terminated = True done = True if len(result) == 5: return obs, shaped, terminated, truncated, info return obs, shaped, done, info def _compute_reward(self, done: bool, info: dict): # Record position for efficiency calculation try: pos = info.get('pos', (0.0, 0.0, 0.0)) self._pos_history.append(np.array([float(pos[0]), float(pos[2])])) except (TypeError, ValueError, IndexError): pass if done: return -1.0, False try: cte = float(info.get('cte', 0.0) or 0.0) except (TypeError, ValueError): cte = 0.0 try: speed = max(0.0, float(info.get('speed', 0.0) or 0.0)) except (TypeError, ValueError): speed = 0.0 # --- No-progress termination --- # Terminates episodes where the car isn't advancing along the track # (circling near the start, stuck against a barrier, etc.). try: active_node = int(info.get('active_node', -1) or 0) except (TypeError, ValueError): active_node = -1 if active_node >= 0: if active_node > self._max_node_seen: self._max_node_seen = active_node self._no_progress_steps = 0 else: self._no_progress_steps += 1 if self._no_progress_steps >= self.progress_patience: return -1.0, True # --- Lap detection: reset progress tracker + exploit guard --- try: current_lap_count = int(info.get('lap_count', 0) or 0) except (TypeError, ValueError): current_lap_count = self._last_lap_count if current_lap_count > self._last_lap_count: self._last_lap_count = current_lap_count self._max_node_seen = -1 self._no_progress_steps = 0 try: lap_time = float(info.get('last_lap_time', 999.0) or 999.0) except (TypeError, ValueError): lap_time = 999.0 if lap_time < self.min_lap_time: return -10.0 * (self.min_lap_time / max(lap_time, 0.1)), True # --- Efficiency gate: zero reward when circling --- if self._compute_efficiency() < self.min_efficiency: return 0.0, False # --- Core reward: speed × CTE quality --- cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0) speed_norm = min(speed / 10.0, 1.0) return cte_quality * speed_norm, False def _compute_efficiency(self) -> float: if len(self._pos_history) < 3: return 1.0 positions = list(self._pos_history) net = float(np.linalg.norm(positions[-1] - positions[0])) total = float(sum(np.linalg.norm(positions[i+1] - positions[i]) for i in range(len(positions) - 1))) return net / total if total > 1e-6 else 1.0