donkeycar-rl-autoresearch/agent/reward_wrapper.py

177 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Speed × CTE Reward Wrapper for DonkeyCar RL — v7 (Clean)
=========================================================
The simulator now uses solid BoxCollider barriers with Continuous Collision
Detection on the car Rigidbody. The car physically cannot escape the track.
This removes the need for every Python-side exploit patch that lived here:
REMOVED (simulator now enforces these physically):
- CTE-patience termination (car can't get far off track anyway)
- High-CTE negative reward patch
- solid_hit / barrier-contact monitoring
- low-speed / wedge detection
KEPT (still needed — physics can't detect these):
- Efficiency gate: zero reward when circling
(car on-track but spinning in circles, not advancing)
- No-progress termination: active_node not advancing
(car stuck at waypoint, not completing the course)
- Lap exploit check: super-fast laps are physically impossible but kept
as a sanity guard
FORMULA:
cte_quality = 1.0 - min(|cte| / max_cte, 1.0) # [0,1]: centred=1
speed_norm = min(speed / 10.0, 1.0) # [0,1]: normalised
efficiency = net_displacement / total_path # [0,1]: straight=1, circle=0
if efficiency < min_efficiency:
reward = 0.0 # circling — no incentive
else:
reward = cte_quality × speed_norm
On done/crash: reward = -1.0
"""
import gymnasium as gym
import numpy as np
from collections import deque
class SpeedRewardWrapper(gym.Wrapper):
"""
Reward = speed × CTE_quality, gated by path efficiency.
Args:
env: gymnasium environment
window_size: steps for efficiency gate history (default 30)
min_efficiency: efficiency threshold — below this, reward = 0 (default 0.15)
max_cte: CTE at which reward reaches 0 (default 8.0)
min_lap_time: laps faster than this are penalised (exploit guard)
progress_patience: steps without new max active_node before termination
"""
def __init__(
self,
env,
window_size: int = 30,
min_efficiency: float = 0.15,
max_cte: float = 8.0,
min_lap_time: float = 5.0,
progress_patience: int = 60,
):
super().__init__(env)
self.window_size = window_size
self.min_efficiency = min_efficiency
self.max_cte = max_cte
self.min_lap_time = min_lap_time
self.progress_patience = progress_patience
self._pos_history = deque(maxlen=window_size + 1)
self._last_lap_count = 0
self._max_node_seen = -1
self._no_progress_steps = 0
def reset(self, **kwargs):
result = self.env.reset(**kwargs)
self._pos_history.clear()
self._last_lap_count = 0
self._max_node_seen = -1
self._no_progress_steps = 0
return result
def step(self, action):
result = self.env.step(action)
if len(result) == 5:
obs, _sim_reward, terminated, truncated, info = result
done = terminated or truncated
elif len(result) == 4:
obs, _sim_reward, done, info = result
terminated = done
truncated = False
else:
raise ValueError(f'Unexpected step() result length: {len(result)}')
shaped, force_terminate = self._compute_reward(done, info)
if force_terminate:
terminated = True
done = True
if len(result) == 5:
return obs, shaped, terminated, truncated, info
return obs, shaped, done, info
def _compute_reward(self, done: bool, info: dict):
# Record position for efficiency calculation
try:
pos = info.get('pos', (0.0, 0.0, 0.0))
self._pos_history.append(np.array([float(pos[0]), float(pos[2])]))
except (TypeError, ValueError, IndexError):
pass
if done:
return -1.0, False
try:
cte = float(info.get('cte', 0.0) or 0.0)
except (TypeError, ValueError):
cte = 0.0
try:
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
except (TypeError, ValueError):
speed = 0.0
# --- No-progress termination ---
# Terminates episodes where the car isn't advancing along the track
# (circling near the start, stuck against a barrier, etc.).
try:
active_node = int(info.get('active_node', -1) or 0)
except (TypeError, ValueError):
active_node = -1
if active_node >= 0:
if active_node > self._max_node_seen:
self._max_node_seen = active_node
self._no_progress_steps = 0
else:
self._no_progress_steps += 1
if self._no_progress_steps >= self.progress_patience:
return -1.0, True
# --- Lap detection: reset progress tracker + exploit guard ---
try:
current_lap_count = int(info.get('lap_count', 0) or 0)
except (TypeError, ValueError):
current_lap_count = self._last_lap_count
if current_lap_count > self._last_lap_count:
self._last_lap_count = current_lap_count
self._max_node_seen = -1
self._no_progress_steps = 0
try:
lap_time = float(info.get('last_lap_time', 999.0) or 999.0)
except (TypeError, ValueError):
lap_time = 999.0
if lap_time < self.min_lap_time:
return -10.0 * (self.min_lap_time / max(lap_time, 0.1)), True
# --- Efficiency gate: zero reward when circling ---
if self._compute_efficiency() < self.min_efficiency:
return 0.0, False
# --- Core reward: speed × CTE quality ---
cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0)
speed_norm = min(speed / 10.0, 1.0)
return cte_quality * speed_norm, False
def _compute_efficiency(self) -> float:
if len(self._pos_history) < 3:
return 1.0
positions = list(self._pos_history)
net = float(np.linalg.norm(positions[-1] - positions[0]))
total = float(sum(np.linalg.norm(positions[i+1] - positions[i])
for i in range(len(positions) - 1)))
return net / total if total > 1e-6 else 1.0