161 lines
6.2 KiB
Python
161 lines
6.2 KiB
Python
"""
|
||
Progress-Based Reward Wrapper for DonkeyCar RL — v3 (Anti-Circular)
|
||
====================================================================
|
||
|
||
PROBLEM HISTORY:
|
||
v1 (additive): speed × (1 - cte/max_cte)
|
||
→ Hacked by oscillating at track boundary (trials 8+13 in corrupted data)
|
||
|
||
v2 (multiplicative): original × (1 + speed_scale × speed)
|
||
→ Still hacked by circling ON the track (trial 5: cv=0.0%, 4582 reward)
|
||
→ Circular motion has low CTE + positive speed → full speed bonus
|
||
→ Neither CTE nor raw speed can distinguish forward vs circular motion
|
||
|
||
v3 (path efficiency): original × (1 + speed_scale × speed × path_efficiency)
|
||
→ Path efficiency = net_displacement / path_length over sliding window
|
||
→ Forward driving: efficiency ≈ 1.0 (all movement is productive)
|
||
→ Circular driving: efficiency ≈ 0.0 (movement cancels out, no net advance)
|
||
→ Speed bonus disappears when circling → car incentivized to go FORWARD
|
||
|
||
FORMULA:
|
||
efficiency = |pos_t - pos_{t-window}| / Σ|pos_i - pos_{i-1}|
|
||
= net_displacement / total_path_length
|
||
|
||
shaped_reward = original_reward × (1 + speed_scale × speed × efficiency)
|
||
|
||
(when original_reward ≤ 0: no bonus, just penalty — same as v2)
|
||
|
||
RESEARCH NOTE (2026-04-13):
|
||
Circular driving discovered in Phase 1 despite v2 fix.
|
||
Trial 5: mean_reward=4582, cv=0.0% over 4787 steps.
|
||
User visually confirmed: car circling at start line.
|
||
See docs/RESEARCH_LOG.md for full analysis.
|
||
|
||
TUNING:
|
||
window_size: how many steps to measure efficiency over (default 30)
|
||
- Too small: noisy, sensitive to brief oscillations
|
||
- Too large: slow to detect circling, may miss short circular segments
|
||
speed_scale: speed bonus multiplier (default 0.1)
|
||
min_efficiency: minimum efficiency before speed bonus disappears (default 0.1)
|
||
"""
|
||
|
||
import gymnasium as gym
|
||
import numpy as np
|
||
from collections import deque
|
||
|
||
|
||
class SpeedRewardWrapper(gym.Wrapper):
|
||
"""
|
||
Path-efficiency-gated speed reward.
|
||
Speed bonus only applies proportionally to how much the car is making NET FORWARD PROGRESS.
|
||
|
||
Args:
|
||
env: gymnasium environment
|
||
speed_scale: speed bonus multiplier (default 0.1)
|
||
window_size: number of steps for efficiency measurement (default 30)
|
||
min_efficiency: efficiency floor below which speed bonus is zero (default 0.05)
|
||
"""
|
||
|
||
def __init__(self, env, speed_scale: float = 0.1, window_size: int = 30, min_efficiency: float = 0.05):
|
||
super().__init__(env)
|
||
self.speed_scale = speed_scale
|
||
self.window_size = window_size
|
||
self.min_efficiency = min_efficiency
|
||
|
||
# Sliding window of positions for efficiency calculation
|
||
self._pos_history = deque(maxlen=window_size + 1)
|
||
self._path_length = 0.0
|
||
|
||
def reset(self, **kwargs):
|
||
result = self.env.reset(**kwargs)
|
||
self._pos_history.clear()
|
||
self._path_length = 0.0
|
||
return result
|
||
|
||
def step(self, action):
|
||
result = self.env.step(action)
|
||
|
||
# Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs
|
||
if len(result) == 5:
|
||
obs, reward, terminated, truncated, info = result
|
||
done = terminated or truncated
|
||
elif len(result) == 4:
|
||
obs, reward, done, info = result
|
||
terminated = done
|
||
truncated = False
|
||
else:
|
||
raise ValueError(f'Unexpected step() result length: {len(result)}')
|
||
|
||
shaped = self._shape_reward(reward, info)
|
||
|
||
if len(result) == 5:
|
||
return obs, shaped, terminated, truncated, info
|
||
else:
|
||
return obs, shaped, done, info
|
||
|
||
def _get_pos(self, info: dict):
|
||
"""Extract position from info dict. Returns None if unavailable."""
|
||
pos = info.get('pos', None)
|
||
if pos is None:
|
||
return None
|
||
try:
|
||
return np.array(pos[:3], dtype=np.float64)
|
||
except (TypeError, IndexError, ValueError):
|
||
return None
|
||
|
||
def _compute_efficiency(self) -> float:
|
||
"""
|
||
Compute path efficiency = net displacement / total path length over window.
|
||
Returns 1.0 if insufficient history (can't penalize yet).
|
||
Returns 0.0 if no movement.
|
||
"""
|
||
if len(self._pos_history) < 3:
|
||
return 1.0 # Not enough history, give benefit of doubt
|
||
|
||
positions = list(self._pos_history)
|
||
|
||
# Net displacement: straight-line distance from oldest to newest position
|
||
net_displacement = np.linalg.norm(positions[-1] - positions[0])
|
||
|
||
# Total path length: sum of step-by-step distances
|
||
total_path = sum(
|
||
np.linalg.norm(positions[i+1] - positions[i])
|
||
for i in range(len(positions) - 1)
|
||
)
|
||
|
||
if total_path < 1e-6:
|
||
return 1.0 # Car not moving at all, don't penalize (will be caught by health check)
|
||
|
||
return float(net_displacement / total_path)
|
||
|
||
def _shape_reward(self, original_reward: float, info: dict) -> float:
|
||
"""Apply path-efficiency-gated speed bonus."""
|
||
# Update position history
|
||
pos = self._get_pos(info)
|
||
if pos is not None:
|
||
self._pos_history.append(pos)
|
||
|
||
# Only apply speed bonus when genuinely on track (positive CTE reward)
|
||
if original_reward <= 0:
|
||
return original_reward # Off track / crashed — no speed reward
|
||
|
||
# Extract speed
|
||
try:
|
||
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
|
||
except (TypeError, ValueError):
|
||
return original_reward
|
||
|
||
# Compute path efficiency (detects circular motion)
|
||
efficiency = self._compute_efficiency()
|
||
|
||
# Clamp efficiency: below min_efficiency, no speed bonus
|
||
effective_efficiency = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency))
|
||
|
||
# Multiplicative bonus: fast forward progress → full bonus, circling → zero bonus
|
||
shaped = original_reward * (1.0 + self.speed_scale * speed * effective_efficiency)
|
||
return shaped
|
||
|
||
def theoretical_max_per_step(self, max_speed: float = 10.0) -> float:
|
||
"""Upper bound on reward per step (for hack detection calibration)."""
|
||
return 1.0 * (1.0 + self.speed_scale * max_speed * 1.0) # efficiency=1 at best
|