177 lines
6.3 KiB
Python
177 lines
6.3 KiB
Python
"""
|
||
Speed × CTE Reward Wrapper for DonkeyCar RL — v7 (Clean)
|
||
=========================================================
|
||
|
||
The simulator now uses solid BoxCollider barriers with Continuous Collision
|
||
Detection on the car Rigidbody. The car physically cannot escape the track.
|
||
This removes the need for every Python-side exploit patch that lived here:
|
||
|
||
REMOVED (simulator now enforces these physically):
|
||
- CTE-patience termination (car can't get far off track anyway)
|
||
- High-CTE negative reward patch
|
||
- solid_hit / barrier-contact monitoring
|
||
- low-speed / wedge detection
|
||
|
||
KEPT (still needed — physics can't detect these):
|
||
- Efficiency gate: zero reward when circling
|
||
(car on-track but spinning in circles, not advancing)
|
||
- No-progress termination: active_node not advancing
|
||
(car stuck at waypoint, not completing the course)
|
||
- Lap exploit check: super-fast laps are physically impossible but kept
|
||
as a sanity guard
|
||
|
||
FORMULA:
|
||
cte_quality = 1.0 - min(|cte| / max_cte, 1.0) # [0,1]: centred=1
|
||
speed_norm = min(speed / 10.0, 1.0) # [0,1]: normalised
|
||
efficiency = net_displacement / total_path # [0,1]: straight=1, circle=0
|
||
|
||
if efficiency < min_efficiency:
|
||
reward = 0.0 # circling — no incentive
|
||
else:
|
||
reward = cte_quality × speed_norm
|
||
|
||
On done/crash: reward = -1.0
|
||
"""
|
||
|
||
import gymnasium as gym
|
||
import numpy as np
|
||
from collections import deque
|
||
|
||
|
||
class SpeedRewardWrapper(gym.Wrapper):
|
||
"""
|
||
Reward = speed × CTE_quality, gated by path efficiency.
|
||
|
||
Args:
|
||
env: gymnasium environment
|
||
window_size: steps for efficiency gate history (default 30)
|
||
min_efficiency: efficiency threshold — below this, reward = 0 (default 0.15)
|
||
max_cte: CTE at which reward reaches 0 (default 8.0)
|
||
min_lap_time: laps faster than this are penalised (exploit guard)
|
||
progress_patience: steps without new max active_node before termination
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
env,
|
||
window_size: int = 30,
|
||
min_efficiency: float = 0.15,
|
||
max_cte: float = 8.0,
|
||
min_lap_time: float = 5.0,
|
||
progress_patience: int = 60,
|
||
):
|
||
super().__init__(env)
|
||
self.window_size = window_size
|
||
self.min_efficiency = min_efficiency
|
||
self.max_cte = max_cte
|
||
self.min_lap_time = min_lap_time
|
||
self.progress_patience = progress_patience
|
||
|
||
self._pos_history = deque(maxlen=window_size + 1)
|
||
self._last_lap_count = 0
|
||
self._max_node_seen = -1
|
||
self._no_progress_steps = 0
|
||
|
||
def reset(self, **kwargs):
|
||
result = self.env.reset(**kwargs)
|
||
self._pos_history.clear()
|
||
self._last_lap_count = 0
|
||
self._max_node_seen = -1
|
||
self._no_progress_steps = 0
|
||
return result
|
||
|
||
def step(self, action):
|
||
result = self.env.step(action)
|
||
|
||
if len(result) == 5:
|
||
obs, _sim_reward, terminated, truncated, info = result
|
||
done = terminated or truncated
|
||
elif len(result) == 4:
|
||
obs, _sim_reward, done, info = result
|
||
terminated = done
|
||
truncated = False
|
||
else:
|
||
raise ValueError(f'Unexpected step() result length: {len(result)}')
|
||
|
||
shaped, force_terminate = self._compute_reward(done, info)
|
||
if force_terminate:
|
||
terminated = True
|
||
done = True
|
||
|
||
if len(result) == 5:
|
||
return obs, shaped, terminated, truncated, info
|
||
return obs, shaped, done, info
|
||
|
||
def _compute_reward(self, done: bool, info: dict):
|
||
# Record position for efficiency calculation
|
||
try:
|
||
pos = info.get('pos', (0.0, 0.0, 0.0))
|
||
self._pos_history.append(np.array([float(pos[0]), float(pos[2])]))
|
||
except (TypeError, ValueError, IndexError):
|
||
pass
|
||
|
||
if done:
|
||
return -1.0, False
|
||
|
||
try:
|
||
cte = float(info.get('cte', 0.0) or 0.0)
|
||
except (TypeError, ValueError):
|
||
cte = 0.0
|
||
|
||
try:
|
||
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
|
||
except (TypeError, ValueError):
|
||
speed = 0.0
|
||
|
||
# --- No-progress termination ---
|
||
# Terminates episodes where the car isn't advancing along the track
|
||
# (circling near the start, stuck against a barrier, etc.).
|
||
try:
|
||
active_node = int(info.get('active_node', -1) or 0)
|
||
except (TypeError, ValueError):
|
||
active_node = -1
|
||
|
||
if active_node >= 0:
|
||
if active_node > self._max_node_seen:
|
||
self._max_node_seen = active_node
|
||
self._no_progress_steps = 0
|
||
else:
|
||
self._no_progress_steps += 1
|
||
if self._no_progress_steps >= self.progress_patience:
|
||
return -1.0, True
|
||
|
||
# --- Lap detection: reset progress tracker + exploit guard ---
|
||
try:
|
||
current_lap_count = int(info.get('lap_count', 0) or 0)
|
||
except (TypeError, ValueError):
|
||
current_lap_count = self._last_lap_count
|
||
|
||
if current_lap_count > self._last_lap_count:
|
||
self._last_lap_count = current_lap_count
|
||
self._max_node_seen = -1
|
||
self._no_progress_steps = 0
|
||
try:
|
||
lap_time = float(info.get('last_lap_time', 999.0) or 999.0)
|
||
except (TypeError, ValueError):
|
||
lap_time = 999.0
|
||
if lap_time < self.min_lap_time:
|
||
return -10.0 * (self.min_lap_time / max(lap_time, 0.1)), True
|
||
|
||
# --- Efficiency gate: zero reward when circling ---
|
||
if self._compute_efficiency() < self.min_efficiency:
|
||
return 0.0, False
|
||
|
||
# --- Core reward: speed × CTE quality ---
|
||
cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0)
|
||
speed_norm = min(speed / 10.0, 1.0)
|
||
return cte_quality * speed_norm, False
|
||
|
||
def _compute_efficiency(self) -> float:
|
||
if len(self._pos_history) < 3:
|
||
return 1.0
|
||
positions = list(self._pos_history)
|
||
net = float(np.linalg.norm(positions[-1] - positions[0]))
|
||
total = float(sum(np.linalg.norm(positions[i+1] - positions[i])
|
||
for i in range(len(positions) - 1)))
|
||
return net / total if total > 1e-6 else 1.0
|