donkeycar-rl-autoresearch/agent/reward_wrapper.py

268 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Speed + Progress Reward Wrapper for DonkeyCar RL — v6 (Speed×CTE + Efficiency Gate)
=====================================================================================
REWARD HACKING HISTORY:
v1 additive: speed × (1-cte/max_cte) → boundary oscillation
v2 multiplicative: original × (1+speed×scale) → circular driving (on-track)
v3 path efficiency: original × (1+speed×eff×scale) → still circling!
WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward.
A spinning car at CTE≈0 still earns 1.0/step × thousands of steps.
v4: base × eff × (1 + speed_scale × speed) → zero gradient on hills!
WHY v4 failed on hills: speed≈0 AND eff≈0 AND cte_quality varies → all
three terms near zero simultaneously → no gradient to push ANY term up.
v5: speed × CTE_quality (no efficiency) → circular driving returns!
WHY v5 failed: dropped efficiency entirely. Circular driving at CTE≈0
with speed>0 earns positive reward indefinitely. Observed in Exp 11.
v6 (THIS VERSION): v5 reward + efficiency GATE.
Keeps v5's gradient properties (non-zero gradient on hills) but adds
a binary efficiency check that zeros reward when car is circling.
ROOT CAUSE OF CIRCLING:
The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity).
A spinning car is ALWAYS moving "forward" relative to its own heading,
so forward_vel > 0 always, giving positive reward while circling indefinitely.
We bypass this entirely.
FORMULA (v6):
cte_quality = 1.0 - min(|cte| / max_cte, 1.0) # [0,1] centred=1
speed_norm = min(speed / 10.0, 1.0) # [0,1] normalised
efficiency = net_displacement / total_path # [0,1] straight=1, circle=0
if efficiency < min_efficiency:
reward = 0.0 # GATE: circling → zero reward (but not negative)
else:
reward = cte_quality × speed_norm # v5 formula (gradient on hills)
On done/crash: reward = -1.0
WHY GATE NOT MULTIPLIER:
v4 used efficiency as a multiplier: reward = base × eff × speed_bonus.
On a hill: speed≈0, eff≈0, base≈0.5 → reward≈0 and ∂reward/∂speed≈0.
No gradient to push speed up — car stays stuck.
v6 gate: efficiency is either PASS or FAIL. When efficiency > threshold
(car moving forward at all), reward = speed × CTE_quality. On a hill:
car is stuck but still has eff > 0 (not literally circling), so the gate
passes and the reward = speed × CTE_quality. ∂reward/∂speed > 0 → gradient
pushes toward more throttle. Circle has eff ≈ 0 → gate fails → reward = 0.
PROPERTIES:
- Circling (eff<threshold): reward = 0 (no incentive to circle)
- On track, stuck (eff>0): reward = speed × CTE (gradient toward unstuck)
- On track, fast: reward = high (speed + centred)
- Off track: reward ≈ 0 (CTE_quality → 0)
- Crash: reward = -1.0
"""
import gymnasium as gym
import numpy as np
from collections import deque
class SpeedRewardWrapper(gym.Wrapper):
"""
Full reward bypass: speed × CTE_quality, gated by efficiency.
Completely ignores the sim's own reward (which uses forward_vel and is
exploitable by circular/spinning motion).
Exploit termination:
- Sustained high CTE (> max_cte_terminate for cte_patience steps): grass exploit
- No track progress (active_node max not advancing for progress_patience steps):
catches circular driving, stuck-on-cone, stuck-on-barrier.
A circling car stays near the same waypoints — active_node never advances.
A stuck car never advances either. Forward driving always advances.
Args:
env: gymnasium environment
speed_scale: speed bonus multiplier (default 0.1)
window_size: steps for efficiency gate (default 30)
min_efficiency: efficiency gate threshold (default 0.15)
max_cte: track half-width for reward normalization (default 8.0)
min_lap_time: laps faster than this are penalised as exploits
max_cte_terminate: terminate if CTE > this for cte_patience steps
cte_patience: steps of sustained high CTE before termination
progress_patience: steps without new max active_node before termination
"""
def __init__(
self,
env,
speed_scale: float = 0.1,
window_size: int = 30,
min_efficiency: float = 0.15,
max_cte: float = 8.0,
min_lap_time: float = 5.0,
max_cte_terminate: float = 4.0,
cte_patience: int = 20,
progress_patience: int = 60, # ~3.3s at 18 steps/sec
):
super().__init__(env)
self.speed_scale = speed_scale
self.window_size = window_size
self.min_efficiency = min_efficiency
self.max_cte = max_cte
self.min_lap_time = min_lap_time
self.max_cte_terminate = max_cte_terminate
self.cte_patience = cte_patience
self.progress_patience = progress_patience
self._pos_history = deque(maxlen=window_size + 1)
self._last_lap_count = 0
self._high_cte_steps = 0
self._max_node_seen = -1 # highest active_node reached this episode
self._no_progress_steps = 0 # steps since max_node last increased
def reset(self, **kwargs):
result = self.env.reset(**kwargs)
self._pos_history.clear()
self._last_lap_count = 0
self._high_cte_steps = 0
self._max_node_seen = -1
self._no_progress_steps = 0
return result
def step(self, action):
result = self.env.step(action)
# Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs
if len(result) == 5:
obs, _sim_reward, terminated, truncated, info = result
done = terminated or truncated
elif len(result) == 4:
obs, _sim_reward, done, info = result
terminated = done
truncated = False
else:
raise ValueError(f'Unexpected step() result length: {len(result)}')
# Completely ignore _sim_reward — compute our own
shaped, force_terminate = self._compute_reward_and_done(done, info)
if force_terminate:
terminated = True
done = True
if len(result) == 5:
return obs, shaped, terminated, truncated, info
else:
return obs, shaped, done, info
def _compute_reward_and_done(self, done: bool, info: dict):
"""
v6.1: speed × CTE-quality + efficiency gate + grass/rollback terminators.
New termination conditions:
- Sustained high CTE: CTE > max_cte_terminate for cte_patience steps
→ terminate. Stops the grass exploit (car exits track gap and
drives indefinitely on grass with CTE just under max_cte=8.0).
- No track progress: active_node doesn't advance for progress_patience
steps → terminate. Stops mountain rollback (car goes up, rolls
back, IS moving so StuckWrapper doesn't fire, but never advances).
reward = speed_norm × cte_quality (when efficiency >= threshold)
reward = 0.0 (when circling)
reward = -1.0 (on crash/termination)
"""
# Track position for efficiency calculation
try:
pos = info.get('pos', (0.0, 0.0, 0.0))
pos_x = float(pos[0])
pos_z = float(pos[2])
self._pos_history.append(np.array([pos_x, pos_z]))
except (TypeError, ValueError, IndexError):
pass
# Crash / episode over
if done:
return -1.0, False
# --- CTE value for all checks ---
try:
cte = float(info.get('cte', 0.0) or 0.0)
except (TypeError, ValueError):
cte = 0.0
# --- Grass exploit: sustained high CTE termination ---
if abs(cte) > self.max_cte_terminate:
self._high_cte_steps += 1
if self._high_cte_steps >= self.cte_patience:
return -1.0, True # too long off-track — terminate
else:
self._high_cte_steps = 0
# --- Circle / stuck exploit: no track progress termination ---
# Track the highest active_node (track waypoint) reached this episode.
# A circling car stays near the same waypoints — max_node never advances.
# A stuck car never advances either. Only genuine forward driving advances.
# On lap completion, active_node resets to 0 — we reset our tracker too.
try:
active_node = int(info.get('active_node', -1) or 0)
total_nodes = int(info.get('total_nodes', 1) or 1)
except (TypeError, ValueError):
active_node = -1
total_nodes = 1
if active_node >= 0:
if active_node > self._max_node_seen:
# New furthest point reached — genuine forward progress
self._max_node_seen = active_node
self._no_progress_steps = 0
else:
self._no_progress_steps += 1
if self._no_progress_steps >= self.progress_patience:
return -1.0, True # no forward progress — terminate
try:
current_lap_count = int(info.get('lap_count', 0) or 0)
except (TypeError, ValueError):
current_lap_count = self._last_lap_count
if current_lap_count > self._last_lap_count:
self._last_lap_count = current_lap_count
# Reset progress tracker — active_node wraps to 0 on new lap
self._max_node_seen = -1
self._no_progress_steps = 0
try:
lap_time = float(info.get('last_lap_time', 999.0) or 999.0)
except (TypeError, ValueError):
lap_time = 999.0
if lap_time < self.min_lap_time:
penalty = -10.0 * (self.min_lap_time / max(lap_time, 0.1))
return penalty, True
# --- Efficiency gate: detect circular driving ---
efficiency = self._compute_efficiency()
if efficiency < self.min_efficiency:
return 0.0, False
# --- CTE quality ---
cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0)
# --- Speed ---
try:
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
except (TypeError, ValueError):
speed = 0.0
# --- v6 reward: speed × CTE quality ---
speed_norm = min(speed / 10.0, 1.0)
return cte_quality * speed_norm, False
def _compute_efficiency(self) -> float:
"""Path efficiency = net_displacement / total_path_length."""
if len(self._pos_history) < 3:
return 1.0 # Insufficient history — give benefit of doubt
positions = list(self._pos_history)
net = np.linalg.norm(positions[-1] - positions[0])
total = sum(
np.linalg.norm(positions[i + 1] - positions[i])
for i in range(len(positions) - 1)
)
return float(net / total) if total > 1e-6 else 1.0
def theoretical_max_per_step(self, max_speed: float = 10.0) -> float:
"""Upper bound on reward/step (efficiency=1, CTE=0, max speed)."""
return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed)