215 lines
9.0 KiB
Python
215 lines
9.0 KiB
Python
"""
|
||
Speed + Progress Reward Wrapper for DonkeyCar RL — v6 (Speed×CTE + Efficiency Gate)
|
||
=====================================================================================
|
||
|
||
REWARD HACKING HISTORY:
|
||
v1 additive: speed × (1-cte/max_cte) → boundary oscillation
|
||
v2 multiplicative: original × (1+speed×scale) → circular driving (on-track)
|
||
v3 path efficiency: original × (1+speed×eff×scale) → still circling!
|
||
WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward.
|
||
A spinning car at CTE≈0 still earns 1.0/step × thousands of steps.
|
||
v4: base × eff × (1 + speed_scale × speed) → zero gradient on hills!
|
||
WHY v4 failed on hills: speed≈0 AND eff≈0 AND cte_quality varies → all
|
||
three terms near zero simultaneously → no gradient to push ANY term up.
|
||
v5: speed × CTE_quality (no efficiency) → circular driving returns!
|
||
WHY v5 failed: dropped efficiency entirely. Circular driving at CTE≈0
|
||
with speed>0 earns positive reward indefinitely. Observed in Exp 11.
|
||
v6 (THIS VERSION): v5 reward + efficiency GATE.
|
||
Keeps v5's gradient properties (non-zero gradient on hills) but adds
|
||
a binary efficiency check that zeros reward when car is circling.
|
||
|
||
ROOT CAUSE OF CIRCLING:
|
||
The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity).
|
||
A spinning car is ALWAYS moving "forward" relative to its own heading,
|
||
so forward_vel > 0 always, giving positive reward while circling indefinitely.
|
||
We bypass this entirely.
|
||
|
||
FORMULA (v6):
|
||
cte_quality = 1.0 - min(|cte| / max_cte, 1.0) # [0,1] centred=1
|
||
speed_norm = min(speed / 10.0, 1.0) # [0,1] normalised
|
||
efficiency = net_displacement / total_path # [0,1] straight=1, circle=0
|
||
|
||
if efficiency < min_efficiency:
|
||
reward = 0.0 # GATE: circling → zero reward (but not negative)
|
||
else:
|
||
reward = cte_quality × speed_norm # v5 formula (gradient on hills)
|
||
|
||
On done/crash: reward = -1.0
|
||
|
||
WHY GATE NOT MULTIPLIER:
|
||
v4 used efficiency as a multiplier: reward = base × eff × speed_bonus.
|
||
On a hill: speed≈0, eff≈0, base≈0.5 → reward≈0 and ∂reward/∂speed≈0.
|
||
No gradient to push speed up — car stays stuck.
|
||
|
||
v6 gate: efficiency is either PASS or FAIL. When efficiency > threshold
|
||
(car moving forward at all), reward = speed × CTE_quality. On a hill:
|
||
car is stuck but still has eff > 0 (not literally circling), so the gate
|
||
passes and the reward = speed × CTE_quality. ∂reward/∂speed > 0 → gradient
|
||
pushes toward more throttle. Circle has eff ≈ 0 → gate fails → reward = 0.
|
||
|
||
PROPERTIES:
|
||
- Circling (eff<threshold): reward = 0 (no incentive to circle)
|
||
- On track, stuck (eff>0): reward = speed × CTE (gradient toward unstuck)
|
||
- On track, fast: reward = high (speed + centred)
|
||
- Off track: reward ≈ 0 (CTE_quality → 0)
|
||
- Crash: reward = -1.0
|
||
"""
|
||
|
||
import gymnasium as gym
|
||
import numpy as np
|
||
from collections import deque
|
||
|
||
|
||
class SpeedRewardWrapper(gym.Wrapper):
|
||
"""
|
||
Full reward bypass: base CTE reward × path efficiency × speed bonus.
|
||
|
||
Completely ignores the sim's own reward (which uses forward_vel and is
|
||
exploitable by circular/spinning motion).
|
||
|
||
Args:
|
||
env: gymnasium environment
|
||
speed_scale: speed bonus multiplier (default 0.1)
|
||
window_size: steps for efficiency calculation (default 30)
|
||
min_efficiency: efficiency below which no reward (default 0.05)
|
||
max_cte: track half-width for normalization (default 8.0, matches sim)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
env,
|
||
speed_scale: float = 0.1,
|
||
window_size: int = 30, # captures 2+ full circles at typical circling speed
|
||
min_efficiency: float = 0.15, # gate threshold: circles ≈ 0.13, wobbly straight ≈ 0.98
|
||
max_cte: float = 8.0,
|
||
min_lap_time: float = 5.0, # laps faster than this are penalised as exploits
|
||
):
|
||
super().__init__(env)
|
||
self.speed_scale = speed_scale
|
||
self.window_size = window_size
|
||
self.min_efficiency = min_efficiency
|
||
self.max_cte = max_cte
|
||
self.min_lap_time = min_lap_time
|
||
self._pos_history = deque(maxlen=window_size + 1)
|
||
self._last_lap_count = 0 # track lap completions to detect short-lap exploit
|
||
|
||
def reset(self, **kwargs):
|
||
result = self.env.reset(**kwargs)
|
||
self._pos_history.clear()
|
||
self._last_lap_count = 0
|
||
return result
|
||
|
||
def step(self, action):
|
||
result = self.env.step(action)
|
||
|
||
# Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs
|
||
if len(result) == 5:
|
||
obs, _sim_reward, terminated, truncated, info = result
|
||
done = terminated or truncated
|
||
elif len(result) == 4:
|
||
obs, _sim_reward, done, info = result
|
||
terminated = done
|
||
truncated = False
|
||
else:
|
||
raise ValueError(f'Unexpected step() result length: {len(result)}')
|
||
|
||
# Completely ignore _sim_reward — compute our own
|
||
shaped, force_terminate = self._compute_reward_and_done(done, info)
|
||
if force_terminate:
|
||
terminated = True
|
||
done = True
|
||
|
||
if len(result) == 5:
|
||
return obs, shaped, terminated, truncated, info
|
||
else:
|
||
return obs, shaped, done, info
|
||
|
||
def _compute_reward_and_done(self, done: bool, info: dict):
|
||
"""
|
||
v6: speed × CTE-quality + efficiency gate.
|
||
|
||
reward = speed_norm × cte_quality (when efficiency >= threshold)
|
||
reward = 0.0 (when efficiency < threshold — circling)
|
||
reward = -1.0 (on crash/done)
|
||
|
||
The efficiency gate prevents circular driving (eff≈0 for circles)
|
||
without killing gradient on hills (eff>0 for a stuck-but-not-circling
|
||
car, so the gate passes and speed×CTE gradient pushes toward unstuck).
|
||
|
||
Exploit protection:
|
||
- Efficiency gate: circles → reward = 0
|
||
- Short-lap penalty: laps < min_lap_time → large negative + terminate
|
||
- StuckTerminationWrapper: done=True after stuck_steps of no movement
|
||
- Crash: done=True → -1.0
|
||
"""
|
||
# Track position for efficiency calculation
|
||
try:
|
||
pos = info.get('pos', (0.0, 0.0, 0.0))
|
||
pos_x = float(pos[0])
|
||
pos_z = float(pos[2]) # z is forward in Unity coordinate system
|
||
self._pos_history.append(np.array([pos_x, pos_z]))
|
||
except (TypeError, ValueError, IndexError):
|
||
pass
|
||
|
||
# Crash / episode over
|
||
if done:
|
||
return -1.0, False
|
||
|
||
# --- Short-lap exploit detection ---
|
||
try:
|
||
current_lap_count = int(info.get('lap_count', 0) or 0)
|
||
except (TypeError, ValueError):
|
||
current_lap_count = self._last_lap_count
|
||
|
||
if current_lap_count > self._last_lap_count:
|
||
self._last_lap_count = current_lap_count
|
||
try:
|
||
lap_time = float(info.get('last_lap_time', 999.0) or 999.0)
|
||
except (TypeError, ValueError):
|
||
lap_time = 999.0
|
||
if lap_time < self.min_lap_time:
|
||
penalty = -10.0 * (self.min_lap_time / max(lap_time, 0.1))
|
||
return penalty, True # (reward, force_terminate)
|
||
|
||
# --- Efficiency gate: detect circular driving ---
|
||
efficiency = self._compute_efficiency()
|
||
if efficiency < self.min_efficiency:
|
||
# Car is circling — zero reward but don't terminate.
|
||
# Zero (not negative) so there's no perverse incentive to crash
|
||
# early to avoid accumulating penalties.
|
||
return 0.0, False
|
||
|
||
# --- CTE quality: how centred is the car? ---
|
||
try:
|
||
cte = float(info.get('cte', 0.0) or 0.0)
|
||
except (TypeError, ValueError):
|
||
cte = 0.0
|
||
cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0) # 0=off track, 1=centred
|
||
|
||
# --- Speed ---
|
||
try:
|
||
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
|
||
except (TypeError, ValueError):
|
||
speed = 0.0
|
||
|
||
# --- v6 reward: speed × CTE quality (same as v5, but gated) ---
|
||
speed_norm = min(speed / 10.0, 1.0)
|
||
return cte_quality * speed_norm, False
|
||
|
||
def _compute_efficiency(self) -> float:
|
||
"""Path efficiency = net_displacement / total_path_length."""
|
||
if len(self._pos_history) < 3:
|
||
return 1.0 # Insufficient history — give benefit of doubt
|
||
|
||
positions = list(self._pos_history)
|
||
net = np.linalg.norm(positions[-1] - positions[0])
|
||
total = sum(
|
||
np.linalg.norm(positions[i + 1] - positions[i])
|
||
for i in range(len(positions) - 1)
|
||
)
|
||
return float(net / total) if total > 1e-6 else 1.0
|
||
|
||
def theoretical_max_per_step(self, max_speed: float = 10.0) -> float:
|
||
"""Upper bound on reward/step (efficiency=1, CTE=0, max speed)."""
|
||
return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed)
|