donkeycar-rl-autoresearch/agent/reward_wrapper.py

161 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Progress-Based Reward Wrapper for DonkeyCar RL — v3 (Anti-Circular)
====================================================================
PROBLEM HISTORY:
v1 (additive): speed × (1 - cte/max_cte)
→ Hacked by oscillating at track boundary (trials 8+13 in corrupted data)
v2 (multiplicative): original × (1 + speed_scale × speed)
→ Still hacked by circling ON the track (trial 5: cv=0.0%, 4582 reward)
→ Circular motion has low CTE + positive speed → full speed bonus
→ Neither CTE nor raw speed can distinguish forward vs circular motion
v3 (path efficiency): original × (1 + speed_scale × speed × path_efficiency)
→ Path efficiency = net_displacement / path_length over sliding window
→ Forward driving: efficiency ≈ 1.0 (all movement is productive)
→ Circular driving: efficiency ≈ 0.0 (movement cancels out, no net advance)
→ Speed bonus disappears when circling → car incentivized to go FORWARD
FORMULA:
efficiency = |pos_t - pos_{t-window}| / Σ|pos_i - pos_{i-1}|
= net_displacement / total_path_length
shaped_reward = original_reward × (1 + speed_scale × speed × efficiency)
(when original_reward ≤ 0: no bonus, just penalty — same as v2)
RESEARCH NOTE (2026-04-13):
Circular driving discovered in Phase 1 despite v2 fix.
Trial 5: mean_reward=4582, cv=0.0% over 4787 steps.
User visually confirmed: car circling at start line.
See docs/RESEARCH_LOG.md for full analysis.
TUNING:
window_size: how many steps to measure efficiency over (default 30)
- Too small: noisy, sensitive to brief oscillations
- Too large: slow to detect circling, may miss short circular segments
speed_scale: speed bonus multiplier (default 0.1)
min_efficiency: minimum efficiency before speed bonus disappears (default 0.1)
"""
import gymnasium as gym
import numpy as np
from collections import deque
class SpeedRewardWrapper(gym.Wrapper):
"""
Path-efficiency-gated speed reward.
Speed bonus only applies proportionally to how much the car is making NET FORWARD PROGRESS.
Args:
env: gymnasium environment
speed_scale: speed bonus multiplier (default 0.1)
window_size: number of steps for efficiency measurement (default 30)
min_efficiency: efficiency floor below which speed bonus is zero (default 0.05)
"""
def __init__(self, env, speed_scale: float = 0.1, window_size: int = 30, min_efficiency: float = 0.05):
super().__init__(env)
self.speed_scale = speed_scale
self.window_size = window_size
self.min_efficiency = min_efficiency
# Sliding window of positions for efficiency calculation
self._pos_history = deque(maxlen=window_size + 1)
self._path_length = 0.0
def reset(self, **kwargs):
result = self.env.reset(**kwargs)
self._pos_history.clear()
self._path_length = 0.0
return result
def step(self, action):
result = self.env.step(action)
# Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs
if len(result) == 5:
obs, reward, terminated, truncated, info = result
done = terminated or truncated
elif len(result) == 4:
obs, reward, done, info = result
terminated = done
truncated = False
else:
raise ValueError(f'Unexpected step() result length: {len(result)}')
shaped = self._shape_reward(reward, info)
if len(result) == 5:
return obs, shaped, terminated, truncated, info
else:
return obs, shaped, done, info
def _get_pos(self, info: dict):
"""Extract position from info dict. Returns None if unavailable."""
pos = info.get('pos', None)
if pos is None:
return None
try:
return np.array(pos[:3], dtype=np.float64)
except (TypeError, IndexError, ValueError):
return None
def _compute_efficiency(self) -> float:
"""
Compute path efficiency = net displacement / total path length over window.
Returns 1.0 if insufficient history (can't penalize yet).
Returns 0.0 if no movement.
"""
if len(self._pos_history) < 3:
return 1.0 # Not enough history, give benefit of doubt
positions = list(self._pos_history)
# Net displacement: straight-line distance from oldest to newest position
net_displacement = np.linalg.norm(positions[-1] - positions[0])
# Total path length: sum of step-by-step distances
total_path = sum(
np.linalg.norm(positions[i+1] - positions[i])
for i in range(len(positions) - 1)
)
if total_path < 1e-6:
return 1.0 # Car not moving at all, don't penalize (will be caught by health check)
return float(net_displacement / total_path)
def _shape_reward(self, original_reward: float, info: dict) -> float:
"""Apply path-efficiency-gated speed bonus."""
# Update position history
pos = self._get_pos(info)
if pos is not None:
self._pos_history.append(pos)
# Only apply speed bonus when genuinely on track (positive CTE reward)
if original_reward <= 0:
return original_reward # Off track / crashed — no speed reward
# Extract speed
try:
speed = max(0.0, float(info.get('speed', 0.0) or 0.0))
except (TypeError, ValueError):
return original_reward
# Compute path efficiency (detects circular motion)
efficiency = self._compute_efficiency()
# Clamp efficiency: below min_efficiency, no speed bonus
effective_efficiency = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency))
# Multiplicative bonus: fast forward progress → full bonus, circling → zero bonus
shaped = original_reward * (1.0 + self.speed_scale * speed * effective_efficiency)
return shaped
def theoretical_max_per_step(self, max_speed: float = 10.0) -> float:
"""Upper bound on reward per step (for hack detection calibration)."""
return 1.0 * (1.0 + self.speed_scale * max_speed * 1.0) # efficiency=1 at best