""" Behavioral Reward Wrappers for DonkeyCar RL — Phase 3 ====================================================== These wrappers extend the base SpeedRewardWrapper (v4) with behavioral control mechanisms discovered in Phase 2: 1. LanePositionWrapper — drive at a specific lateral position 2. AntiOscillationWrapper — suppress steering oscillation 3. AsymmetricCTEWrapper — enforce right-lane rule (penalise left more) RESEARCH CONTEXT (Phase 2 findings): - The base CTE reward is symmetric — car picks left or right based on random NN initialisation → different driving styles emerge randomly - n_steer=3 (fewer bins) produces cleaner, more stable driving than n_steer=4 - These wrappers let us deliberately shape driving behaviour USAGE: from reward_wrapper import SpeedRewardWrapper from behavioral_wrappers import LanePositionWrapper, AntiOscillationWrapper env = LanePositionWrapper( AntiOscillationWrapper( SpeedRewardWrapper(base_env), oscillation_penalty=0.05 ), target_cte=-0.3, # Slightly right of centre position_weight=0.3 ) """ import gymnasium as gym import numpy as np from collections import deque class LanePositionWrapper(gym.Wrapper): """ Biases the car to drive at a specific lateral position (target CTE). Adds a position bonus/penalty on top of any existing shaped reward: position_bonus = position_weight × (1 - abs(cte - target_cte) / max_cte) Examples: target_cte = 0.0 → drive on centre line (default CTE behaviour) target_cte = -0.5 → drive slightly right of centre (right-lane rule) target_cte = +0.5 → drive slightly left of centre target_cte = -1.5 → hug the right shoulder (like Trial 18!) Args: target_cte: desired CTE offset from centre (negative = right) position_weight: how strongly to enforce the target (0=off, 0.3=moderate) max_cte: track half-width (default 8.0, matches sim) """ def __init__(self, env, target_cte: float = 0.0, position_weight: float = 0.2, max_cte: float = 8.0): super().__init__(env) self.target_cte = target_cte self.position_weight = position_weight self.max_cte = max_cte def step(self, action): result = self.env.step(action) if len(result) == 5: obs, reward, terminated, truncated, info = result else: obs, reward, done, info = result terminated, truncated = done, False cte = float(info.get('cte', 0.0) or 0.0) position_bonus = self.position_weight * ( 1.0 - min(abs(cte - self.target_cte) / self.max_cte, 1.0) ) shaped = reward + position_bonus if reward > 0 else reward # Only bonus when on track if len(result) == 5: return obs, shaped, terminated, truncated, info return obs, shaped, terminated, info class AntiOscillationWrapper(gym.Wrapper): """ Penalises rapid changes in steering to suppress oscillating driving. Addresses the behaviour observed in Trial 8 (n_steer=4, oscillating). Computes the change in steering from the previous step and subtracts a scaled penalty from the reward. oscillation_penalty_amount = oscillation_penalty × |Δsteering| The steered action must be a continuous value or index — we track the last action and penalise large changes. Args: oscillation_penalty: scale factor for the steering change penalty history_window: number of steps to compute average oscillation over """ def __init__(self, env, oscillation_penalty: float = 0.05, history_window: int = 10): super().__init__(env) self.oscillation_penalty = oscillation_penalty self.history_window = history_window self._action_history = deque(maxlen=history_window) self._last_action = None def reset(self, **kwargs): result = self.env.reset(**kwargs) self._action_history.clear() self._last_action = None return result def step(self, action): result = self.env.step(action) if len(result) == 5: obs, reward, terminated, truncated, info = result else: obs, reward, done, info = result terminated, truncated = done, False # Compute steering change penalty if self._last_action is not None: try: curr = float(action[0]) if hasattr(action, '__len__') else float(action) prev = float(self._last_action[0]) if hasattr(self._last_action, '__len__') else float(self._last_action) delta = abs(curr - prev) penalty = self.oscillation_penalty * delta shaped = reward - penalty if reward > 0 else reward except (TypeError, IndexError): shaped = reward else: shaped = reward self._last_action = action self._action_history.append(action) if len(result) == 5: return obs, shaped, terminated, truncated, info return obs, shaped, terminated, info def current_oscillation_score(self) -> float: """Returns mean absolute steering change over history window.""" if len(self._action_history) < 2: return 0.0 actions = list(self._action_history) deltas = [] for i in range(1, len(actions)): try: curr = float(actions[i][0]) if hasattr(actions[i], '__len__') else float(actions[i]) prev = float(actions[i-1][0]) if hasattr(actions[i-1], '__len__') else float(actions[i-1]) deltas.append(abs(curr - prev)) except (TypeError, IndexError): pass return float(np.mean(deltas)) if deltas else 0.0 class AsymmetricCTEWrapper(gym.Wrapper): """ Enforces right-lane driving by penalising left-of-centre more than right. In the default reward, CTE is symmetric — |CTE| only. This wrapper applies an extra penalty when the car drifts left (positive CTE in DonkeyCar convention means left-of-centre). Formula: if cte > 0 (left of centre): extra_penalty = left_penalty × cte / max_cte if cte < 0 (right of centre): no penalty (or small bonus) Args: left_penalty: additional penalty multiplier for left-of-centre driving right_bonus: small bonus for right-of-centre driving (optional) max_cte: track half-width (default 8.0) """ def __init__(self, env, left_penalty: float = 0.3, right_bonus: float = 0.05, max_cte: float = 8.0): super().__init__(env) self.left_penalty = left_penalty self.right_bonus = right_bonus self.max_cte = max_cte def step(self, action): result = self.env.step(action) if len(result) == 5: obs, reward, terminated, truncated, info = result else: obs, reward, done, info = result terminated, truncated = done, False if reward > 0: # Only modify reward when on track cte = float(info.get('cte', 0.0) or 0.0) if cte > 0: # Left of centre — penalise penalty = self.left_penalty * min(cte / self.max_cte, 1.0) shaped = reward * (1.0 - penalty) else: # Right of centre — small bonus bonus = self.right_bonus * min(abs(cte) / self.max_cte, 1.0) shaped = reward * (1.0 + bonus) else: shaped = reward if len(result) == 5: return obs, shaped, terminated, truncated, info return obs, shaped, terminated, info class CombinedBehavioralWrapper(gym.Wrapper): """ Convenience wrapper combining all three behavioral controls. Apply this on top of SpeedRewardWrapper (v4). Args: target_cte: desired lateral position (default 0.0 = centre) position_weight: lane position enforcement strength (default 0.2) oscillation_penalty: steering smoothness enforcement (default 0.05) enforce_right_lane: if True, apply asymmetric CTE penalty (default False) max_cte: track half-width (default 8.0) """ def __init__( self, env, target_cte: float = 0.0, position_weight: float = 0.2, oscillation_penalty: float = 0.05, enforce_right_lane: bool = False, max_cte: float = 8.0, ): super().__init__(env) self.target_cte = target_cte self.position_weight = position_weight self.oscillation_penalty = oscillation_penalty self.enforce_right_lane = enforce_right_lane self.max_cte = max_cte self._last_action = None def reset(self, **kwargs): self._last_action = None return self.env.reset(**kwargs) def step(self, action): result = self.env.step(action) if len(result) == 5: obs, reward, terminated, truncated, info = result else: obs, reward, done, info = result terminated, truncated = done, False cte = float(info.get('cte', 0.0) or 0.0) if reward > 0: shaped = reward # 1. Lane position bonus pos_bonus = self.position_weight * ( 1.0 - min(abs(cte - self.target_cte) / self.max_cte, 1.0) ) shaped += pos_bonus # 2. Anti-oscillation penalty if self._last_action is not None: try: curr = float(action[0]) if hasattr(action, '__len__') else float(action) prev = float(self._last_action[0]) if hasattr(self._last_action, '__len__') else float(self._last_action) shaped -= self.oscillation_penalty * abs(curr - prev) except (TypeError, IndexError): pass # 3. Right-lane enforcement (asymmetric CTE) if self.enforce_right_lane and cte > 0: penalty = 0.3 * min(cte / self.max_cte, 1.0) shaped *= (1.0 - penalty) else: shaped = reward self._last_action = action if len(result) == 5: return obs, shaped, terminated, truncated, info return obs, shaped, terminated, info