donkeycar-rl-autoresearch/agent/behavioral_wrappers.py

278 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Behavioral Reward Wrappers for DonkeyCar RL — Phase 3
======================================================
These wrappers extend the base SpeedRewardWrapper (v4) with behavioral
control mechanisms discovered in Phase 2:
1. LanePositionWrapper — drive at a specific lateral position
2. AntiOscillationWrapper — suppress steering oscillation
3. AsymmetricCTEWrapper — enforce right-lane rule (penalise left more)
RESEARCH CONTEXT (Phase 2 findings):
- The base CTE reward is symmetric — car picks left or right based on
random NN initialisation → different driving styles emerge randomly
- n_steer=3 (fewer bins) produces cleaner, more stable driving than n_steer=4
- These wrappers let us deliberately shape driving behaviour
USAGE:
from reward_wrapper import SpeedRewardWrapper
from behavioral_wrappers import LanePositionWrapper, AntiOscillationWrapper
env = LanePositionWrapper(
AntiOscillationWrapper(
SpeedRewardWrapper(base_env),
oscillation_penalty=0.05
),
target_cte=-0.3, # Slightly right of centre
position_weight=0.3
)
"""
import gymnasium as gym
import numpy as np
from collections import deque
class LanePositionWrapper(gym.Wrapper):
"""
Biases the car to drive at a specific lateral position (target CTE).
Adds a position bonus/penalty on top of any existing shaped reward:
position_bonus = position_weight × (1 - abs(cte - target_cte) / max_cte)
Examples:
target_cte = 0.0 → drive on centre line (default CTE behaviour)
target_cte = -0.5 → drive slightly right of centre (right-lane rule)
target_cte = +0.5 → drive slightly left of centre
target_cte = -1.5 → hug the right shoulder (like Trial 18!)
Args:
target_cte: desired CTE offset from centre (negative = right)
position_weight: how strongly to enforce the target (0=off, 0.3=moderate)
max_cte: track half-width (default 8.0, matches sim)
"""
def __init__(self, env, target_cte: float = 0.0, position_weight: float = 0.2, max_cte: float = 8.0):
super().__init__(env)
self.target_cte = target_cte
self.position_weight = position_weight
self.max_cte = max_cte
def step(self, action):
result = self.env.step(action)
if len(result) == 5:
obs, reward, terminated, truncated, info = result
else:
obs, reward, done, info = result
terminated, truncated = done, False
cte = float(info.get('cte', 0.0) or 0.0)
position_bonus = self.position_weight * (
1.0 - min(abs(cte - self.target_cte) / self.max_cte, 1.0)
)
shaped = reward + position_bonus if reward > 0 else reward # Only bonus when on track
if len(result) == 5:
return obs, shaped, terminated, truncated, info
return obs, shaped, terminated, info
class AntiOscillationWrapper(gym.Wrapper):
"""
Penalises rapid changes in steering to suppress oscillating driving.
Addresses the behaviour observed in Trial 8 (n_steer=4, oscillating).
Computes the change in steering from the previous step and subtracts
a scaled penalty from the reward.
oscillation_penalty_amount = oscillation_penalty × |Δsteering|
The steered action must be a continuous value or index — we track the
last action and penalise large changes.
Args:
oscillation_penalty: scale factor for the steering change penalty
history_window: number of steps to compute average oscillation over
"""
def __init__(self, env, oscillation_penalty: float = 0.05, history_window: int = 10):
super().__init__(env)
self.oscillation_penalty = oscillation_penalty
self.history_window = history_window
self._action_history = deque(maxlen=history_window)
self._last_action = None
def reset(self, **kwargs):
result = self.env.reset(**kwargs)
self._action_history.clear()
self._last_action = None
return result
def step(self, action):
result = self.env.step(action)
if len(result) == 5:
obs, reward, terminated, truncated, info = result
else:
obs, reward, done, info = result
terminated, truncated = done, False
# Compute steering change penalty
if self._last_action is not None:
try:
curr = float(action[0]) if hasattr(action, '__len__') else float(action)
prev = float(self._last_action[0]) if hasattr(self._last_action, '__len__') else float(self._last_action)
delta = abs(curr - prev)
penalty = self.oscillation_penalty * delta
shaped = reward - penalty if reward > 0 else reward
except (TypeError, IndexError):
shaped = reward
else:
shaped = reward
self._last_action = action
self._action_history.append(action)
if len(result) == 5:
return obs, shaped, terminated, truncated, info
return obs, shaped, terminated, info
def current_oscillation_score(self) -> float:
"""Returns mean absolute steering change over history window."""
if len(self._action_history) < 2:
return 0.0
actions = list(self._action_history)
deltas = []
for i in range(1, len(actions)):
try:
curr = float(actions[i][0]) if hasattr(actions[i], '__len__') else float(actions[i])
prev = float(actions[i-1][0]) if hasattr(actions[i-1], '__len__') else float(actions[i-1])
deltas.append(abs(curr - prev))
except (TypeError, IndexError):
pass
return float(np.mean(deltas)) if deltas else 0.0
class AsymmetricCTEWrapper(gym.Wrapper):
"""
Enforces right-lane driving by penalising left-of-centre more than right.
In the default reward, CTE is symmetric — |CTE| only. This wrapper
applies an extra penalty when the car drifts left (positive CTE in
DonkeyCar convention means left-of-centre).
Formula:
if cte > 0 (left of centre): extra_penalty = left_penalty × cte / max_cte
if cte < 0 (right of centre): no penalty (or small bonus)
Args:
left_penalty: additional penalty multiplier for left-of-centre driving
right_bonus: small bonus for right-of-centre driving (optional)
max_cte: track half-width (default 8.0)
"""
def __init__(self, env, left_penalty: float = 0.3, right_bonus: float = 0.05, max_cte: float = 8.0):
super().__init__(env)
self.left_penalty = left_penalty
self.right_bonus = right_bonus
self.max_cte = max_cte
def step(self, action):
result = self.env.step(action)
if len(result) == 5:
obs, reward, terminated, truncated, info = result
else:
obs, reward, done, info = result
terminated, truncated = done, False
if reward > 0: # Only modify reward when on track
cte = float(info.get('cte', 0.0) or 0.0)
if cte > 0: # Left of centre — penalise
penalty = self.left_penalty * min(cte / self.max_cte, 1.0)
shaped = reward * (1.0 - penalty)
else: # Right of centre — small bonus
bonus = self.right_bonus * min(abs(cte) / self.max_cte, 1.0)
shaped = reward * (1.0 + bonus)
else:
shaped = reward
if len(result) == 5:
return obs, shaped, terminated, truncated, info
return obs, shaped, terminated, info
class CombinedBehavioralWrapper(gym.Wrapper):
"""
Convenience wrapper combining all three behavioral controls.
Apply this on top of SpeedRewardWrapper (v4).
Args:
target_cte: desired lateral position (default 0.0 = centre)
position_weight: lane position enforcement strength (default 0.2)
oscillation_penalty: steering smoothness enforcement (default 0.05)
enforce_right_lane: if True, apply asymmetric CTE penalty (default False)
max_cte: track half-width (default 8.0)
"""
def __init__(
self,
env,
target_cte: float = 0.0,
position_weight: float = 0.2,
oscillation_penalty: float = 0.05,
enforce_right_lane: bool = False,
max_cte: float = 8.0,
):
super().__init__(env)
self.target_cte = target_cte
self.position_weight = position_weight
self.oscillation_penalty = oscillation_penalty
self.enforce_right_lane = enforce_right_lane
self.max_cte = max_cte
self._last_action = None
def reset(self, **kwargs):
self._last_action = None
return self.env.reset(**kwargs)
def step(self, action):
result = self.env.step(action)
if len(result) == 5:
obs, reward, terminated, truncated, info = result
else:
obs, reward, done, info = result
terminated, truncated = done, False
cte = float(info.get('cte', 0.0) or 0.0)
if reward > 0:
shaped = reward
# 1. Lane position bonus
pos_bonus = self.position_weight * (
1.0 - min(abs(cte - self.target_cte) / self.max_cte, 1.0)
)
shaped += pos_bonus
# 2. Anti-oscillation penalty
if self._last_action is not None:
try:
curr = float(action[0]) if hasattr(action, '__len__') else float(action)
prev = float(self._last_action[0]) if hasattr(self._last_action, '__len__') else float(self._last_action)
shaped -= self.oscillation_penalty * abs(curr - prev)
except (TypeError, IndexError):
pass
# 3. Right-lane enforcement (asymmetric CTE)
if self.enforce_right_lane and cte > 0:
penalty = 0.3 * min(cte / self.max_cte, 1.0)
shaped *= (1.0 - penalty)
else:
shaped = reward
self._last_action = action
if len(result) == 5:
return obs, shaped, terminated, truncated, info
return obs, shaped, terminated, info