278 lines
10 KiB
Python
278 lines
10 KiB
Python
"""
|
||
Behavioral Reward Wrappers for DonkeyCar RL — Phase 3
|
||
======================================================
|
||
|
||
These wrappers extend the base SpeedRewardWrapper (v4) with behavioral
|
||
control mechanisms discovered in Phase 2:
|
||
|
||
1. LanePositionWrapper — drive at a specific lateral position
|
||
2. AntiOscillationWrapper — suppress steering oscillation
|
||
3. AsymmetricCTEWrapper — enforce right-lane rule (penalise left more)
|
||
|
||
RESEARCH CONTEXT (Phase 2 findings):
|
||
- The base CTE reward is symmetric — car picks left or right based on
|
||
random NN initialisation → different driving styles emerge randomly
|
||
- n_steer=3 (fewer bins) produces cleaner, more stable driving than n_steer=4
|
||
- These wrappers let us deliberately shape driving behaviour
|
||
|
||
USAGE:
|
||
from reward_wrapper import SpeedRewardWrapper
|
||
from behavioral_wrappers import LanePositionWrapper, AntiOscillationWrapper
|
||
|
||
env = LanePositionWrapper(
|
||
AntiOscillationWrapper(
|
||
SpeedRewardWrapper(base_env),
|
||
oscillation_penalty=0.05
|
||
),
|
||
target_cte=-0.3, # Slightly right of centre
|
||
position_weight=0.3
|
||
)
|
||
"""
|
||
|
||
import gymnasium as gym
|
||
import numpy as np
|
||
from collections import deque
|
||
|
||
|
||
class LanePositionWrapper(gym.Wrapper):
|
||
"""
|
||
Biases the car to drive at a specific lateral position (target CTE).
|
||
|
||
Adds a position bonus/penalty on top of any existing shaped reward:
|
||
position_bonus = position_weight × (1 - abs(cte - target_cte) / max_cte)
|
||
|
||
Examples:
|
||
target_cte = 0.0 → drive on centre line (default CTE behaviour)
|
||
target_cte = -0.5 → drive slightly right of centre (right-lane rule)
|
||
target_cte = +0.5 → drive slightly left of centre
|
||
target_cte = -1.5 → hug the right shoulder (like Trial 18!)
|
||
|
||
Args:
|
||
target_cte: desired CTE offset from centre (negative = right)
|
||
position_weight: how strongly to enforce the target (0=off, 0.3=moderate)
|
||
max_cte: track half-width (default 8.0, matches sim)
|
||
"""
|
||
|
||
def __init__(self, env, target_cte: float = 0.0, position_weight: float = 0.2, max_cte: float = 8.0):
|
||
super().__init__(env)
|
||
self.target_cte = target_cte
|
||
self.position_weight = position_weight
|
||
self.max_cte = max_cte
|
||
|
||
def step(self, action):
|
||
result = self.env.step(action)
|
||
if len(result) == 5:
|
||
obs, reward, terminated, truncated, info = result
|
||
else:
|
||
obs, reward, done, info = result
|
||
terminated, truncated = done, False
|
||
|
||
cte = float(info.get('cte', 0.0) or 0.0)
|
||
position_bonus = self.position_weight * (
|
||
1.0 - min(abs(cte - self.target_cte) / self.max_cte, 1.0)
|
||
)
|
||
shaped = reward + position_bonus if reward > 0 else reward # Only bonus when on track
|
||
|
||
if len(result) == 5:
|
||
return obs, shaped, terminated, truncated, info
|
||
return obs, shaped, terminated, info
|
||
|
||
|
||
class AntiOscillationWrapper(gym.Wrapper):
|
||
"""
|
||
Penalises rapid changes in steering to suppress oscillating driving.
|
||
|
||
Addresses the behaviour observed in Trial 8 (n_steer=4, oscillating).
|
||
Computes the change in steering from the previous step and subtracts
|
||
a scaled penalty from the reward.
|
||
|
||
oscillation_penalty_amount = oscillation_penalty × |Δsteering|
|
||
|
||
The steered action must be a continuous value or index — we track the
|
||
last action and penalise large changes.
|
||
|
||
Args:
|
||
oscillation_penalty: scale factor for the steering change penalty
|
||
history_window: number of steps to compute average oscillation over
|
||
"""
|
||
|
||
def __init__(self, env, oscillation_penalty: float = 0.05, history_window: int = 10):
|
||
super().__init__(env)
|
||
self.oscillation_penalty = oscillation_penalty
|
||
self.history_window = history_window
|
||
self._action_history = deque(maxlen=history_window)
|
||
self._last_action = None
|
||
|
||
def reset(self, **kwargs):
|
||
result = self.env.reset(**kwargs)
|
||
self._action_history.clear()
|
||
self._last_action = None
|
||
return result
|
||
|
||
def step(self, action):
|
||
result = self.env.step(action)
|
||
if len(result) == 5:
|
||
obs, reward, terminated, truncated, info = result
|
||
else:
|
||
obs, reward, done, info = result
|
||
terminated, truncated = done, False
|
||
|
||
# Compute steering change penalty
|
||
if self._last_action is not None:
|
||
try:
|
||
curr = float(action[0]) if hasattr(action, '__len__') else float(action)
|
||
prev = float(self._last_action[0]) if hasattr(self._last_action, '__len__') else float(self._last_action)
|
||
delta = abs(curr - prev)
|
||
penalty = self.oscillation_penalty * delta
|
||
shaped = reward - penalty if reward > 0 else reward
|
||
except (TypeError, IndexError):
|
||
shaped = reward
|
||
else:
|
||
shaped = reward
|
||
|
||
self._last_action = action
|
||
self._action_history.append(action)
|
||
|
||
if len(result) == 5:
|
||
return obs, shaped, terminated, truncated, info
|
||
return obs, shaped, terminated, info
|
||
|
||
def current_oscillation_score(self) -> float:
|
||
"""Returns mean absolute steering change over history window."""
|
||
if len(self._action_history) < 2:
|
||
return 0.0
|
||
actions = list(self._action_history)
|
||
deltas = []
|
||
for i in range(1, len(actions)):
|
||
try:
|
||
curr = float(actions[i][0]) if hasattr(actions[i], '__len__') else float(actions[i])
|
||
prev = float(actions[i-1][0]) if hasattr(actions[i-1], '__len__') else float(actions[i-1])
|
||
deltas.append(abs(curr - prev))
|
||
except (TypeError, IndexError):
|
||
pass
|
||
return float(np.mean(deltas)) if deltas else 0.0
|
||
|
||
|
||
class AsymmetricCTEWrapper(gym.Wrapper):
|
||
"""
|
||
Enforces right-lane driving by penalising left-of-centre more than right.
|
||
|
||
In the default reward, CTE is symmetric — |CTE| only. This wrapper
|
||
applies an extra penalty when the car drifts left (positive CTE in
|
||
DonkeyCar convention means left-of-centre).
|
||
|
||
Formula:
|
||
if cte > 0 (left of centre): extra_penalty = left_penalty × cte / max_cte
|
||
if cte < 0 (right of centre): no penalty (or small bonus)
|
||
|
||
Args:
|
||
left_penalty: additional penalty multiplier for left-of-centre driving
|
||
right_bonus: small bonus for right-of-centre driving (optional)
|
||
max_cte: track half-width (default 8.0)
|
||
"""
|
||
|
||
def __init__(self, env, left_penalty: float = 0.3, right_bonus: float = 0.05, max_cte: float = 8.0):
|
||
super().__init__(env)
|
||
self.left_penalty = left_penalty
|
||
self.right_bonus = right_bonus
|
||
self.max_cte = max_cte
|
||
|
||
def step(self, action):
|
||
result = self.env.step(action)
|
||
if len(result) == 5:
|
||
obs, reward, terminated, truncated, info = result
|
||
else:
|
||
obs, reward, done, info = result
|
||
terminated, truncated = done, False
|
||
|
||
if reward > 0: # Only modify reward when on track
|
||
cte = float(info.get('cte', 0.0) or 0.0)
|
||
if cte > 0: # Left of centre — penalise
|
||
penalty = self.left_penalty * min(cte / self.max_cte, 1.0)
|
||
shaped = reward * (1.0 - penalty)
|
||
else: # Right of centre — small bonus
|
||
bonus = self.right_bonus * min(abs(cte) / self.max_cte, 1.0)
|
||
shaped = reward * (1.0 + bonus)
|
||
else:
|
||
shaped = reward
|
||
|
||
if len(result) == 5:
|
||
return obs, shaped, terminated, truncated, info
|
||
return obs, shaped, terminated, info
|
||
|
||
|
||
class CombinedBehavioralWrapper(gym.Wrapper):
|
||
"""
|
||
Convenience wrapper combining all three behavioral controls.
|
||
Apply this on top of SpeedRewardWrapper (v4).
|
||
|
||
Args:
|
||
target_cte: desired lateral position (default 0.0 = centre)
|
||
position_weight: lane position enforcement strength (default 0.2)
|
||
oscillation_penalty: steering smoothness enforcement (default 0.05)
|
||
enforce_right_lane: if True, apply asymmetric CTE penalty (default False)
|
||
max_cte: track half-width (default 8.0)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
env,
|
||
target_cte: float = 0.0,
|
||
position_weight: float = 0.2,
|
||
oscillation_penalty: float = 0.05,
|
||
enforce_right_lane: bool = False,
|
||
max_cte: float = 8.0,
|
||
):
|
||
super().__init__(env)
|
||
self.target_cte = target_cte
|
||
self.position_weight = position_weight
|
||
self.oscillation_penalty = oscillation_penalty
|
||
self.enforce_right_lane = enforce_right_lane
|
||
self.max_cte = max_cte
|
||
self._last_action = None
|
||
|
||
def reset(self, **kwargs):
|
||
self._last_action = None
|
||
return self.env.reset(**kwargs)
|
||
|
||
def step(self, action):
|
||
result = self.env.step(action)
|
||
if len(result) == 5:
|
||
obs, reward, terminated, truncated, info = result
|
||
else:
|
||
obs, reward, done, info = result
|
||
terminated, truncated = done, False
|
||
|
||
cte = float(info.get('cte', 0.0) or 0.0)
|
||
|
||
if reward > 0:
|
||
shaped = reward
|
||
|
||
# 1. Lane position bonus
|
||
pos_bonus = self.position_weight * (
|
||
1.0 - min(abs(cte - self.target_cte) / self.max_cte, 1.0)
|
||
)
|
||
shaped += pos_bonus
|
||
|
||
# 2. Anti-oscillation penalty
|
||
if self._last_action is not None:
|
||
try:
|
||
curr = float(action[0]) if hasattr(action, '__len__') else float(action)
|
||
prev = float(self._last_action[0]) if hasattr(self._last_action, '__len__') else float(self._last_action)
|
||
shaped -= self.oscillation_penalty * abs(curr - prev)
|
||
except (TypeError, IndexError):
|
||
pass
|
||
|
||
# 3. Right-lane enforcement (asymmetric CTE)
|
||
if self.enforce_right_lane and cte > 0:
|
||
penalty = 0.3 * min(cte / self.max_cte, 1.0)
|
||
shaped *= (1.0 - penalty)
|
||
else:
|
||
shaped = reward
|
||
|
||
self._last_action = action
|
||
|
||
if len(result) == 5:
|
||
return obs, shaped, terminated, truncated, info
|
||
return obs, shaped, terminated, info
|