donkeycar-rl-autoresearch/agent/reward_wrapper.py

215 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Speed + Progress Reward Wrapper for DonkeyCar RL — v6 (Speed×CTE + Efficiency Gate)
=====================================================================================
REWARD HACKING HISTORY:
v1 additive: speed × (1-cte/max_cte) → boundary oscillation
v2 multiplicative: original × (1+speed×scale) → circular driving (on-track)
v3 path efficiency: original × (1+speed×eff×scale) → still circling!
WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward.
A spinning car at CTE≈0 still earns 1.0/step × thousands of steps.
v4: base × eff × (1 + speed_scale × speed) → zero gradient on hills!
WHY v4 failed on hills: speed≈0 AND eff≈0 AND cte_quality varies → all
three terms near zero simultaneously → no gradient to push ANY term up.
v5: speed × CTE_quality (no efficiency) → circular driving returns!
WHY v5 failed: dropped efficiency entirely. Circular driving at CTE≈0
with speed>0 earns positive reward indefinitely. Observed in Exp 11.
v6 (THIS VERSION): v5 reward + efficiency GATE.
Keeps v5's gradient properties (non-zero gradient on hills) but adds
a binary efficiency check that zeros reward when car is circling.
ROOT CAUSE OF CIRCLING:
The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity).
A spinning car is ALWAYS moving "forward" relative to its own heading,
so forward_vel > 0 always, giving positive reward while circling indefinitely.
We bypass this entirely.
FORMULA (v6):
cte_quality = 1.0 - min(|cte| / max_cte, 1.0) # [0,1] centred=1
speed_norm = min(speed / 10.0, 1.0) # [0,1] normalised
efficiency = net_displacement / total_path # [0,1] straight=1, circle=0
if efficiency < min_efficiency:
reward = 0.0 # GATE: circling → zero reward (but not negative)
else:
reward = cte_quality × speed_norm # v5 formula (gradient on hills)
On done/crash: reward = -1.0
WHY GATE NOT MULTIPLIER:
v4 used efficiency as a multiplier: reward = base × eff × speed_bonus.
On a hill: speed≈0, eff≈0, base≈0.5 → reward≈0 and ∂reward/∂speed≈0.
No gradient to push speed up — car stays stuck.
v6 gate: efficiency is either PASS or FAIL. When efficiency > threshold
(car moving forward at all), reward = speed × CTE_quality. On a hill:
car is stuck but still has eff > 0 (not literally circling), so the gate
passes and the reward = speed × CTE_quality. ∂reward/∂speed > 0 → gradient
pushes toward more throttle. Circle has eff ≈ 0 → gate fails → reward = 0.
PROPERTIES:
- Circling (eff<threshold): reward = 0 (no incentive to circle)
- On track, stuck (eff>0): reward = speed × CTE (gradient toward unstuck)
- On track, fast: reward = high (speed + centred)
- Off track: reward ≈ 0 (CTE_quality → 0)
- Crash: reward = -1.0
"""
import gymnasium as gym
import numpy as np
from collections import deque
class SpeedRewardWrapper(gym.Wrapper):
"""
Full reward bypass: base CTE reward × path efficiency × speed bonus.
Completely ignores the sim's own reward (which uses forward_vel and is
exploitable by circular/spinning motion).
Args:
env: gymnasium environment
speed_scale: speed bonus multiplier (default 0.1)
window_size: steps for efficiency calculation (default 30)
min_efficiency: efficiency below which no reward (default 0.05)
max_cte: track half-width for normalization (default 8.0, matches sim)
"""
def __init__(
self,
env,
speed_scale: float = 0.1,
window_size: int = 30, # captures 2+ full circles at typical circling speed
min_efficiency: float = 0.15, # gate threshold: circles ≈ 0.13, wobbly straight ≈ 0.98
max_cte: float = 8.0,
min_lap_time: float = 5.0, # laps faster than this are penalised as exploits
):
super().__init__(env)
self.speed_scale = speed_scale
self.window_size = window_size
self.min_efficiency = min_efficiency
self.max_cte = max_cte
self.min_lap_time = min_lap_time
self._pos_history = deque(maxlen=window_size + 1)
self._last_lap_count = 0 # track lap completions to detect short-lap exploit
def reset(self, **kwargs):
result = self.env.reset(**kwargs)
self._pos_history.clear()
self._last_lap_count = 0
return result
def step(self, action):
result = self.env.step(action)
# Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs
if len(result) == 5:
obs, _sim_reward, terminated, truncated, info = result
done = terminated or truncated
elif len(result) == 4:
obs, _sim_reward, done, info = result
terminated = done
truncated = False
else:
raise ValueError(f'Unexpected step() result length: {len(result)}')
# Completely ignore _sim_reward — compute our own
shaped, force_terminate = self._compute_reward_and_done(done, info)
if force_terminate:
terminated = True
done = True
if len(result) == 5:
return obs, shaped, terminated, truncated, info
else:
return obs, shaped, done, info
def _compute_reward_and_done(self, done: bool, info: dict):
"""
v6: speed × CTE-quality + efficiency gate.
reward = speed_norm × cte_quality (when efficiency >= threshold)
reward = 0.0 (when efficiency < threshold — circling)
reward = -1.0 (on crash/done)
The efficiency gate prevents circular driving (eff≈0 for circles)
without killing gradient on hills (eff>0 for a stuck-but-not-circling
car, so the gate passes and speed×CTE gradient pushes toward unstuck).
Exploit protection:
- Efficiency gate: circles → reward = 0
- Short-lap penalty: laps < min_lap_time → large negative + terminate
- StuckTerminationWrapper: done=True after stuck_steps of no movement
- Crash: done=True → -1.0
"""
# Track position for efficiency calculation
try:
pos = info.get('pos', (0.0, 0.0, 0.0))
pos_x = float(pos[0])
pos_z = float(pos[2]) # z is forward in Unity coordinate system
self._pos_history.append(np.array([pos_x, pos_z]))
except (TypeError, ValueError, IndexError):
pass
# Crash / episode over
if done:
return -1.0, False
# --- Short-lap exploit detection ---
try:
current_lap_count = int(info.get('lap_count', 0) or 0)
except (TypeError, ValueError):
current_lap_count = self._last_lap_count
if current_lap_count > self._last_lap_count:
self._last_lap_count = current_lap_count
try:
lap_time = float(info.get('last_lap_time', 999.0) or 999.0)
except (TypeError, ValueError):
lap_time = 999.0
if lap_time < self.min_lap_time:
penalty = -10.0 * (self.min_lap_time / max(lap_time, 0.1))
return penalty, True # (reward, force_terminate)
# --- Efficiency gate: detect circular driving ---
efficiency = self._compute_efficiency()
if efficiency < self.min_efficiency:
# Car is circling — zero reward but don't terminate.
# Zero (not negative) so there's no perverse incentive to crash
# early to avoid accumulating penalties.
return 0.0, False
# --- CTE quality: how centred is the car? ---
try:
cte = float(info.get('cte', 0.0) or 0.0)
except (TypeError, ValueError):
cte = 0.0
cte_quality = 1.0 - min(abs(cte) / self.max_cte, 1.0) # 0=off track, 1=centred
# --- Speed ---
try:
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
except (TypeError, ValueError):
speed = 0.0
# --- v6 reward: speed × CTE quality (same as v5, but gated) ---
speed_norm = min(speed / 10.0, 1.0)
return cte_quality * speed_norm, False
def _compute_efficiency(self) -> float:
"""Path efficiency = net_displacement / total_path_length."""
if len(self._pos_history) < 3:
return 1.0 # Insufficient history — give benefit of doubt
positions = list(self._pos_history)
net = np.linalg.norm(positions[-1] - positions[0])
total = sum(
np.linalg.norm(positions[i + 1] - positions[i])
for i in range(len(positions) - 1)
)
return float(net / total) if total > 1e-6 else 1.0
def theoretical_max_per_step(self, max_speed: float = 10.0) -> float:
"""Upper bound on reward/step (efficiency=1, CTE=0, max speed)."""
return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed)