diff --git a/agent/outerloop-results/autoresearch_phase2_log.txt b/agent/outerloop-results/autoresearch_phase2_log.txt new file mode 100644 index 0000000..02b79e3 --- /dev/null +++ b/agent/outerloop-results/autoresearch_phase2_log.txt @@ -0,0 +1,51 @@ +[2026-04-13 19:33:13] ============================================================ +[2026-04-13 19:33:13] [AutoResearch] Phase 1 β€” Real PPO Training + GP+UCB Optimization +[2026-04-13 19:33:13] [AutoResearch] Max trials: 20 | kappa: 2.0 | push every: 5 +[2026-04-13 19:33:13] [AutoResearch] Results: /home/paulh/projects/donkeycar-rl-autoresearch/agent/outerloop-results/autoresearch_results_phase2.jsonl +[2026-04-13 19:33:13] [AutoResearch] Champion: /home/paulh/projects/donkeycar-rl-autoresearch/agent/models/champion +[2026-04-13 19:33:13] ============================================================ +[2026-04-13 19:33:13] [AutoResearch] Loaded 0 existing Phase 1 results. +[2026-04-13 19:33:13] [AutoResearch] Champion: trial=5 mean_reward=4582.7984 params={'n_steer': 7, 'n_throttle': 3, 'learning_rate': 0.0006801262090358742, 'timesteps': 4787, 'agent': 'ppo', 'eval_episodes': 3, 'reward_shaping': True} +[2026-04-13 19:33:13] +[AutoResearch] ========== Trial 1/20 ========== +[2026-04-13 19:33:13] [AutoResearch] Only 0 results β€” using random proposal. +[2026-04-13 19:33:13] [AutoResearch] Proposed: {'n_steer': 4, 'n_throttle': 3, 'learning_rate': 0.0009737963906394612, 'timesteps': 47325, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 19:33:15] [AutoResearch] Launching trial 1: {'n_steer': 4, 'n_throttle': 3, 'learning_rate': 0.0009737963906394612, 'timesteps': 47325, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 20:05:03] [AutoResearch] Trial 1 finished in 1908.3s, returncode=0 +[2026-04-13 20:05:03] [AutoResearch] Trial 1: mean_reward=234.5386 std_reward=3.1547 +[2026-04-13 20:05:03] [AutoResearch] === Trial 1 Summary === +[2026-04-13 20:05:03] Total Phase 1 runs: 1 +[2026-04-13 20:05:03] Champion: trial=5 mean_reward=4582.7984 params={'n_steer': 7, 'n_throttle': 3, 'learning_rate': 0.0006801262090358742, 'timesteps': 4787, 'agent': 'ppo', 'eval_episodes': 3, 'reward_shaping': True} +[2026-04-13 20:05:03] Top 5: +[2026-04-13 20:05:03] mean_reward=234.5386 params={'n_steer': 4, 'n_throttle': 3, 'learning_rate': 0.0009737963906394612, 'timesteps': 47325, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 20:05:05] +[AutoResearch] ========== Trial 2/20 ========== +[2026-04-13 20:05:05] [AutoResearch] Only 1 results β€” using random proposal. +[2026-04-13 20:05:05] [AutoResearch] Proposed: {'n_steer': 8, 'n_throttle': 3, 'learning_rate': 0.0012285179829782996, 'timesteps': 39101, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 20:05:07] [AutoResearch] Launching trial 2: {'n_steer': 8, 'n_throttle': 3, 'learning_rate': 0.0012285179829782996, 'timesteps': 39101, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 20:55:43] [AutoResearch] GP UCB top-5 candidates: +[2026-04-13 20:55:43] UCB=2.3107 mu=0.3981 sigma=0.9563 params={'n_steer': 9, 'n_throttle': 2, 'learning_rate': 0.001405531880392808, 'timesteps': 26173} +[2026-04-13 20:55:43] UCB=2.3049 mu=0.8602 sigma=0.7224 params={'n_steer': 9, 'n_throttle': 3, 'learning_rate': 0.001793493447174312, 'timesteps': 19198} +[2026-04-13 20:55:43] UCB=2.2813 mu=0.4904 sigma=0.8954 params={'n_steer': 9, 'n_throttle': 4, 'learning_rate': 0.0011616192816742616, 'timesteps': 13887} +[2026-04-13 20:55:43] UCB=2.2767 mu=0.5194 sigma=0.8787 params={'n_steer': 9, 'n_throttle': 4, 'learning_rate': 0.0011646447444663046, 'timesteps': 21199} +[2026-04-13 20:55:43] UCB=2.2525 mu=0.6254 sigma=0.8136 params={'n_steer': 9, 'n_throttle': 3, 'learning_rate': 0.0010196345864901517, 'timesteps': 22035} +[2026-04-13 20:55:43] [Champion] πŸ† NEW BEST! Trial 1: mean_reward=50.0000 params={'n_steer': 5} +[2026-04-13 20:55:43] [Champion] πŸ† NEW BEST! Trial 1: mean_reward=80.0000 params={'n_steer': 7} +[2026-04-13 20:55:43] [Champion] πŸ† NEW BEST! Trial 0: mean_reward=50.0000 params={'r': 50} +[2026-04-13 20:55:43] [Champion] πŸ† NEW BEST! Trial 1: mean_reward=80.0000 params={'r': 80} +[2026-04-13 20:55:43] [Champion] πŸ† NEW BEST! Trial 3: mean_reward=90.0000 params={'r': 90} +[2026-04-13 20:55:43] [Champion] πŸ† NEW BEST! Trial 5: mean_reward=75.0000 params={'n_steer': 8} +[2026-04-13 20:55:43] [AutoResearch] Only 1 results β€” using random proposal. +[2026-04-13 20:55:59] [AutoResearch] GP UCB top-5 candidates: +[2026-04-13 20:55:59] UCB=2.3107 mu=0.3981 sigma=0.9563 params={'n_steer': 9, 'n_throttle': 2, 'learning_rate': 0.001405531880392808, 'timesteps': 26173} +[2026-04-13 20:55:59] UCB=2.3049 mu=0.8602 sigma=0.7224 params={'n_steer': 9, 'n_throttle': 3, 'learning_rate': 0.001793493447174312, 'timesteps': 19198} +[2026-04-13 20:55:59] UCB=2.2813 mu=0.4904 sigma=0.8954 params={'n_steer': 9, 'n_throttle': 4, 'learning_rate': 0.0011616192816742616, 'timesteps': 13887} +[2026-04-13 20:55:59] UCB=2.2767 mu=0.5194 sigma=0.8787 params={'n_steer': 9, 'n_throttle': 4, 'learning_rate': 0.0011646447444663046, 'timesteps': 21199} +[2026-04-13 20:55:59] UCB=2.2525 mu=0.6254 sigma=0.8136 params={'n_steer': 9, 'n_throttle': 3, 'learning_rate': 0.0010196345864901517, 'timesteps': 22035} +[2026-04-13 20:55:59] [Champion] πŸ† NEW BEST! Trial 1: mean_reward=50.0000 params={'n_steer': 5} +[2026-04-13 20:55:59] [Champion] πŸ† NEW BEST! Trial 1: mean_reward=80.0000 params={'n_steer': 7} +[2026-04-13 20:55:59] [Champion] πŸ† NEW BEST! Trial 0: mean_reward=50.0000 params={'r': 50} +[2026-04-13 20:55:59] [Champion] πŸ† NEW BEST! Trial 1: mean_reward=80.0000 params={'r': 80} +[2026-04-13 20:55:59] [Champion] πŸ† NEW BEST! Trial 3: mean_reward=90.0000 params={'r': 90} +[2026-04-13 20:55:59] [Champion] πŸ† NEW BEST! Trial 5: mean_reward=75.0000 params={'n_steer': 8} +[2026-04-13 20:55:59] [AutoResearch] Only 1 results β€” using random proposal. diff --git a/agent/outerloop-results/autoresearch_results_phase2.jsonl b/agent/outerloop-results/autoresearch_results_phase2.jsonl new file mode 100644 index 0000000..fd0b2a7 --- /dev/null +++ b/agent/outerloop-results/autoresearch_results_phase2.jsonl @@ -0,0 +1 @@ +{"trial": 1, "timestamp": "2026-04-13T20:05:03.791538", "params": {"n_steer": 4, "n_throttle": 3, "learning_rate": 0.0009737963906394612, "timesteps": 47325, "agent": "ppo", "eval_episodes": 5, "reward_shaping": true}, "mean_reward": 234.5386, "std_reward": 3.1547, "model_path": "/home/paulh/projects/donkeycar-rl-autoresearch/agent/models/trial-0001/model.zip", "champion": false, "run_status": "ok", "elapsed_sec": 1908.32528758049, "reward_hacking_suspected": false} diff --git a/agent/reward_wrapper.py b/agent/reward_wrapper.py index daa22c6..14f9b30 100644 --- a/agent/reward_wrapper.py +++ b/agent/reward_wrapper.py @@ -1,42 +1,41 @@ """ -Progress-Based Reward Wrapper for DonkeyCar RL β€” v3 (Anti-Circular) +Speed + Progress Reward Wrapper for DonkeyCar RL β€” v4 (Full Bypass) ==================================================================== -PROBLEM HISTORY: - v1 (additive): speed Γ— (1 - cte/max_cte) - β†’ Hacked by oscillating at track boundary (trials 8+13 in corrupted data) +REWARD HACKING HISTORY: + v1 additive: speed Γ— (1-cte/max_cte) β†’ boundary oscillation + v2 multiplicative: original Γ— (1+speedΓ—scale) β†’ circular driving (on-track) + v3 path efficiency: original Γ— (1+speedΓ—effΓ—scale) β†’ still circling! + WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward. + A spinning car at CTEβ‰ˆ0 still earns 1.0/step Γ— thousands of steps. - v2 (multiplicative): original Γ— (1 + speed_scale Γ— speed) - β†’ Still hacked by circling ON the track (trial 5: cv=0.0%, 4582 reward) - β†’ Circular motion has low CTE + positive speed β†’ full speed bonus - β†’ Neither CTE nor raw speed can distinguish forward vs circular motion + v4 (THIS VERSION): Completely bypass sim's reward. Multiply base reward by + efficiency so circling yields ZERO reward regardless of CTE. - v3 (path efficiency): original Γ— (1 + speed_scale Γ— speed Γ— path_efficiency) - β†’ Path efficiency = net_displacement / path_length over sliding window - β†’ Forward driving: efficiency β‰ˆ 1.0 (all movement is productive) - β†’ Circular driving: efficiency β‰ˆ 0.0 (movement cancels out, no net advance) - β†’ Speed bonus disappears when circling β†’ car incentivized to go FORWARD +ROOT CAUSE OF CIRCLING: + The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity). + A spinning car is ALWAYS moving "forward" relative to its own heading, + so forward_vel > 0 always, giving positive reward while circling indefinitely. + We bypass this entirely. -FORMULA: - efficiency = |pos_t - pos_{t-window}| / Ξ£|pos_i - pos_{i-1}| - = net_displacement / total_path_length +FORMULA (v4): + base = 1.0 - min(abs(cte) / max_cte, 1.0) # CTE quality [0,1] + eff = net_displacement / total_path_length # Forward progress [0,1] + shaped = base Γ— eff Γ— (1 + speed_scale Γ— speed) # All three must be high - shaped_reward = original_reward Γ— (1 + speed_scale Γ— speed Γ— efficiency) + On done/crash: shaped = -1.0 - (when original_reward ≀ 0: no bonus, just penalty β€” same as v2) +PROPERTIES: + - Spinning (effβ‰ˆ0): shaped β‰ˆ 0 (no reward) + - On track, slow (effβ‰ˆ1): shaped β‰ˆ base (CTE reward only) + - On track, fast (effβ‰ˆ1): shaped > base (CTE + speed bonus) + - Off track (baseβ‰ˆ0): shaped β‰ˆ 0 (penalty via done) + - Cannot be gamed: ALL THREE terms must be high simultaneously RESEARCH NOTE (2026-04-13): - Circular driving discovered in Phase 1 despite v2 fix. - Trial 5: mean_reward=4582, cv=0.0% over 4787 steps. - User visually confirmed: car circling at start line. - See docs/RESEARCH_LOG.md for full analysis. - -TUNING: - window_size: how many steps to measure efficiency over (default 30) - - Too small: noisy, sensitive to brief oscillations - - Too large: slow to detect circling, may miss short circular segments - speed_scale: speed bonus multiplier (default 0.1) - min_efficiency: minimum efficiency before speed bonus disappears (default 0.1) + v3 was insufficient β€” circling at start gave 1.0/step Γ— 47k steps = 47k reward. + v4 makes efficiency a multiplier on the entire reward, not just the speed bonus. + See docs/RESEARCH_LOG.md for full hacking history. """ import gymnasium as gym @@ -46,30 +45,37 @@ from collections import deque class SpeedRewardWrapper(gym.Wrapper): """ - Path-efficiency-gated speed reward. - Speed bonus only applies proportionally to how much the car is making NET FORWARD PROGRESS. + Full reward bypass: base CTE reward Γ— path efficiency Γ— speed bonus. + + Completely ignores the sim's own reward (which uses forward_vel and is + exploitable by circular/spinning motion). Args: - env: gymnasium environment - speed_scale: speed bonus multiplier (default 0.1) - window_size: number of steps for efficiency measurement (default 30) - min_efficiency: efficiency floor below which speed bonus is zero (default 0.05) + env: gymnasium environment + speed_scale: speed bonus multiplier (default 0.1) + window_size: steps for efficiency calculation (default 30) + min_efficiency: efficiency below which no reward (default 0.05) + max_cte: track half-width for normalization (default 8.0, matches sim) """ - def __init__(self, env, speed_scale: float = 0.1, window_size: int = 30, min_efficiency: float = 0.05): + def __init__( + self, + env, + speed_scale: float = 0.1, + window_size: int = 30, + min_efficiency: float = 0.05, + max_cte: float = 8.0, + ): super().__init__(env) self.speed_scale = speed_scale self.window_size = window_size self.min_efficiency = min_efficiency - - # Sliding window of positions for efficiency calculation + self.max_cte = max_cte self._pos_history = deque(maxlen=window_size + 1) - self._path_length = 0.0 def reset(self, **kwargs): result = self.env.reset(**kwargs) self._pos_history.clear() - self._path_length = 0.0 return result def step(self, action): @@ -77,84 +83,76 @@ class SpeedRewardWrapper(gym.Wrapper): # Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs if len(result) == 5: - obs, reward, terminated, truncated, info = result + obs, _sim_reward, terminated, truncated, info = result done = terminated or truncated elif len(result) == 4: - obs, reward, done, info = result + obs, _sim_reward, done, info = result terminated = done truncated = False else: raise ValueError(f'Unexpected step() result length: {len(result)}') - shaped = self._shape_reward(reward, info) + # Completely ignore _sim_reward β€” compute our own + shaped = self._compute_reward(done, info) if len(result) == 5: return obs, shaped, terminated, truncated, info else: return obs, shaped, done, info - def _get_pos(self, info: dict): - """Extract position from info dict. Returns None if unavailable.""" - pos = info.get('pos', None) - if pos is None: - return None - try: - return np.array(pos[:3], dtype=np.float64) - except (TypeError, IndexError, ValueError): - return None - - def _compute_efficiency(self) -> float: + def _compute_reward(self, done: bool, info: dict) -> float: """ - Compute path efficiency = net displacement / total path length over window. - Returns 1.0 if insufficient history (can't penalize yet). - Returns 0.0 if no movement. + Compute reward from scratch using CTE Γ— efficiency Γ— speed. + Bypasses sim's exploitable forward_vel-based reward. """ - if len(self._pos_history) < 3: - return 1.0 # Not enough history, give benefit of doubt + # Crash / episode over + if done: + return -1.0 - positions = list(self._pos_history) - - # Net displacement: straight-line distance from oldest to newest position - net_displacement = np.linalg.norm(positions[-1] - positions[0]) - - # Total path length: sum of step-by-step distances - total_path = sum( - np.linalg.norm(positions[i+1] - positions[i]) - for i in range(len(positions) - 1) - ) - - if total_path < 1e-6: - return 1.0 # Car not moving at all, don't penalize (will be caught by health check) - - return float(net_displacement / total_path) - - def _shape_reward(self, original_reward: float, info: dict) -> float: - """Apply path-efficiency-gated speed bonus.""" # Update position history - pos = self._get_pos(info) + pos = info.get('pos', None) if pos is not None: - self._pos_history.append(pos) + try: + self._pos_history.append(np.array(list(pos)[:3], dtype=np.float64)) + except (TypeError, ValueError): + pass - # Only apply speed bonus when genuinely on track (positive CTE reward) - if original_reward <= 0: - return original_reward # Off track / crashed β€” no speed reward + # --- Base reward: purely CTE-based --- + try: + cte = float(info.get('cte', 0.0) or 0.0) + except (TypeError, ValueError): + cte = 0.0 + base = 1.0 - min(abs(cte) / self.max_cte, 1.0) - # Extract speed + # --- Path efficiency: detects circular motion --- + efficiency = self._compute_efficiency() + # Clamp: below min_efficiency β†’ zero bonus + eff = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency)) + + # --- Speed: from info dict --- try: speed = max(0.0, float(info.get('speed', 0.0) or 0.0)) except (TypeError, ValueError): - return original_reward + speed = 0.0 - # Compute path efficiency (detects circular motion) - efficiency = self._compute_efficiency() - - # Clamp efficiency: below min_efficiency, no speed bonus - effective_efficiency = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency)) - - # Multiplicative bonus: fast forward progress β†’ full bonus, circling β†’ zero bonus - shaped = original_reward * (1.0 + self.speed_scale * speed * effective_efficiency) + # --- Combined reward: ALL three terms must be high --- + # Circling: effβ‰ˆ0 β†’ rewardβ‰ˆ0 regardless of CTE or speed + shaped = base * eff * (1.0 + self.speed_scale * speed) return shaped + def _compute_efficiency(self) -> float: + """Path efficiency = net_displacement / total_path_length.""" + if len(self._pos_history) < 3: + return 1.0 # Insufficient history β€” give benefit of doubt + + positions = list(self._pos_history) + net = np.linalg.norm(positions[-1] - positions[0]) + total = sum( + np.linalg.norm(positions[i + 1] - positions[i]) + for i in range(len(positions) - 1) + ) + return float(net / total) if total > 1e-6 else 1.0 + def theoretical_max_per_step(self, max_speed: float = 10.0) -> float: - """Upper bound on reward per step (for hack detection calibration).""" - return 1.0 * (1.0 + self.speed_scale * max_speed * 1.0) # efficiency=1 at best + """Upper bound on reward/step (efficiency=1, CTE=0, max speed).""" + return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed) diff --git a/docs/RESEARCH_LOG.md b/docs/RESEARCH_LOG.md index e408932..edf5697 100644 --- a/docs/RESEARCH_LOG.md +++ b/docs/RESEARCH_LOG.md @@ -324,3 +324,42 @@ The path efficiency metric (96-100% throughout entire run) confirms the car is m ### This is Research! The reward hacking discovery and the progression from random walk β†’ boundary oscillation β†’ circular exploit β†’ genuine driving represents real empirical RL research. Each failure mode revealed a fundamental property of reward design. The path efficiency fix was an original contribution to solving the circular driving problem without requiring track-shape knowledge. + +--- + +## 2026-04-13 β€” Reward v4: Full Sim Bypass (base Γ— efficiency Γ— speed) + +### Finding: v3 Still Allowed Circling β€” Base Reward Not Gated by Efficiency + +**Observation (user):** Car turning left or right from start in Phase 2 runs (47k timestep trials). + +**Root cause discovered in `donkey_sim.py`:** +```python +# sim's own reward (lines 478-498): +if self.forward_vel > 0.0: + return (1.0 - abs(cte)/max_cte) * self.forward_vel +``` +`forward_vel` = dot(car_heading, velocity). A spinning car is **always** moving forward +relative to its own heading β†’ `forward_vel > 0` always β†’ positive reward while spinning. + +**Why v3 was insufficient:** +- v3 multiplied the SPEED BONUS by efficiency: `original Γ— (1 + scale Γ— speed Γ— eff)` +- But `original` (from sim) was already exploitable: CTEβ‰ˆ0 while spinning β†’ `original=1.0` +- Efficiency killed the speed bonus but NOT the base reward +- A spinning car at CTE=0: 1.0/step Γ— 47k steps = 47k total reward (never crashes in circle!) + +**Fix β€” v4 formula:** +``` +reward = base_CTE Γ— efficiency Γ— (1 + speed_scale Γ— speed) +``` +Where `base_CTE = 1 - abs(cte)/max_cte` computed from info dict, completely bypassing the sim. + +- Spinning (effβ‰ˆ0): reward β‰ˆ 0 regardless of CTE or speed βœ… +- Forward driving (effβ‰ˆ1): reward = base Γ— (1 + scale Γ— speed) βœ… +- All three terms must be high simultaneously to earn reward βœ… + +**Key test added:** `test_circling_at_zero_cte_gives_near_zero_reward` β€” confirms the core +v4 guarantee that the worst-case exploit (CTE=0 spinning) earns near-zero reward. + +**The lesson:** When efficiency is only applied to the SPEED BONUS, the base reward from +the sim can still be gamed. The efficiency multiplier must apply to the ENTIRE reward. diff --git a/tests/test_autoresearch_controller.py b/tests/test_autoresearch_controller.py index 61d1ff9..588c255 100644 --- a/tests/test_autoresearch_controller.py +++ b/tests/test_autoresearch_controller.py @@ -19,7 +19,7 @@ import autoresearch_controller as ctrl def test_param_encode_decode_roundtrip(): """encode β†’ decode should reproduce original values (within int rounding).""" - params = {'n_steer': 7, 'n_throttle': 3, 'learning_rate': 0.002, 'timesteps': 3000} + params = {'n_steer': 7, 'n_throttle': 3, 'learning_rate': 0.002, 'timesteps': 25000} vec = ctrl.encode_params(params) recovered = ctrl.decode_params(vec) assert recovered['n_steer'] == params['n_steer'] diff --git a/tests/test_reward_wrapper.py b/tests/test_reward_wrapper.py index f6eca8a..3d50e34 100644 --- a/tests/test_reward_wrapper.py +++ b/tests/test_reward_wrapper.py @@ -1,240 +1,233 @@ """ -Tests for reward_wrapper.py v3 (path efficiency / anti-circular) β€” no simulator required. +Tests for reward_wrapper.py v4 (full sim bypass β€” base Γ— efficiency Γ— speed). """ -import sys -import os -import math -import pytest +import sys, os, math, pytest import numpy as np import gymnasium as gym from collections import deque sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent')) - from reward_wrapper import SpeedRewardWrapper -def make_env_with_pos(speed=2.0, original_reward=1.0, done=False, pos=(0.0, 0.0, 0.0)): - """Create a mock env that returns a specific position in info dict.""" - class PosEnv(gym.Env): - metadata = {'render_modes': []} - def __init__(self): - super().__init__() - self.action_space = gym.spaces.Discrete(5) - self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8) - self._pos = list(pos) - self._speed = speed - self._reward = original_reward - self._done = done +# ---- Mock Environments ---- - def set_pos(self, p): - self._pos = list(p) +class MockEnv(gym.Env): + """Configurable mock gymnasium.Env.""" + metadata = {'render_modes': []} - def reset(self, seed=None, **kwargs): - return np.zeros((120, 160, 3), dtype=np.uint8), {} + def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True): + super().__init__() + self.action_space = gym.spaces.Discrete(5) + self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8) + self._speed = speed + self._cte = cte + self._pos = list(pos) + self._done = done + self._use_5tuple = use_5tuple - def step(self, action): - obs = np.zeros((120, 160, 3), dtype=np.uint8) - info = {'speed': self._speed, 'pos': self._pos} - return obs, self._reward, self._done, False, info + def set_pos(self, p): self._pos = list(p) + def set_cte(self, c): self._cte = c - def close(self): - pass + def reset(self, seed=None, **kwargs): + return np.zeros((120, 160, 3), dtype=np.uint8), {} - return PosEnv() + def step(self, action): + obs = np.zeros((120, 160, 3), dtype=np.uint8) + # Sim reward uses forward_vel (exploitable) β€” wrapper should IGNORE this + sim_reward = 999.0 # Deliberately bogus β€” wrapper must not use this + info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos} + if self._use_5tuple: + return obs, sim_reward, self._done, False, info + return obs, sim_reward, self._done, info + + def close(self): pass -# ---- Core Anti-Hacking Tests (inherited from v2) ---- +def step_wrapped(wrapped_env, env, pos, cte=0.5, speed=2.0): + env.set_pos(pos) + env.set_cte(cte) + env._speed = speed + return wrapped_env.step(0) -def test_no_speed_bonus_when_off_track(): - """Off-track reward (≀ 0) must NOT get a speed bonus regardless of efficiency.""" - env = make_env_with_pos(speed=10.0, original_reward=-1.0) - wrapped = SpeedRewardWrapper(env, speed_scale=0.5) + +# ---- Core v4 Properties ---- + +def test_sim_reward_is_completely_ignored(): + """ + The wrapper must NOT use the sim's reward (999.0). + v4 computes reward from scratch using CTE/pos/speed only. + """ + env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.)) + wrapped = SpeedRewardWrapper(env, speed_scale=0.1) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) - assert reward == -1.0, f"Off-track reward must not get bonus, got {reward}" + assert reward != 999.0, "Wrapper must not pass through sim's bogus reward" + assert reward < 10.0, f"Reward should be small, got {reward}" -def test_no_speed_bonus_when_reward_zero(): - """Reward exactly 0 should not get speed bonus.""" - env = make_env_with_pos(speed=5.0, original_reward=0.0) - wrapped = SpeedRewardWrapper(env, speed_scale=0.5) - wrapped.reset() - _, reward, _, _, _ = wrapped.step(0) - assert reward == 0.0, f"Zero reward should stay zero, got {reward}" - - -# ---- Path Efficiency Tests ---- - -def _simulate_straight_driving(wrapped_env, env, steps=40, speed=3.0, step_size=0.1): - """Simulate straight-line driving: car moves forward by step_size each step.""" - wrapped_env.reset() - rewards = [] - for i in range(steps): - env.set_pos([i * step_size, 0.0, 0.0]) - env._speed = speed - _, r, _, _, _ = wrapped_env.step(0) - rewards.append(r) - return rewards - - -def _simulate_circular_driving(wrapped_env, env, steps=40, speed=3.0, radius=0.5): - """Simulate circular driving: car moves in a circle, returns to start.""" - wrapped_env.reset() - rewards = [] - for i in range(steps): - angle = 2 * math.pi * i / steps - x = radius * math.cos(angle) - z = radius * math.sin(angle) - env.set_pos([x, 0.0, z]) - env._speed = speed - _, r, _, _, _ = wrapped_env.step(0) - rewards.append(r) - return rewards - - -def test_straight_driving_gets_higher_reward_than_circular(): +def test_circling_at_zero_cte_gives_near_zero_reward(): """ - CRITICAL: Straight driving must produce more total reward than circular driving - at the same speed and base reward. This is the core anti-circular guarantee. + CORE v4 GUARANTEE: A spinning car at CTE=0 must earn near-zero reward. + v3 failed this: spinning at CTE=0 gave 1.0/step regardless of efficiency. + v4 multiplies base reward by efficiency β†’ circling yields β‰ˆ 0. """ - env_straight = make_env_with_pos(speed=3.0, original_reward=0.8) - env_circular = make_env_with_pos(speed=3.0, original_reward=0.8) - - wrapped_straight = SpeedRewardWrapper(env_straight, speed_scale=0.1, window_size=20) - wrapped_circular = SpeedRewardWrapper(env_circular, speed_scale=0.1, window_size=20) - - straight_rewards = _simulate_straight_driving(wrapped_straight, env_straight, steps=40) - circular_rewards = _simulate_circular_driving(wrapped_circular, env_circular, steps=40) - - # After warmup (window fills), straight should consistently beat circular - straight_tail = sum(straight_rewards[20:]) - circular_tail = sum(circular_rewards[20:]) - - assert straight_tail > circular_tail, ( - f"Straight driving ({straight_tail:.2f}) should beat circular ({circular_tail:.2f})" - ) - - -def test_efficiency_near_one_for_straight_driving(): - """Path efficiency should be near 1.0 for straight-line motion.""" - env = make_env_with_pos(speed=3.0, original_reward=1.0) - wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) - wrapped.reset() - - # Drive in a straight line - for i in range(15): - env.set_pos([i * 0.2, 0.0, 0.0]) - wrapped.step(0) - - efficiency = wrapped._compute_efficiency() - assert efficiency > 0.90, f"Straight driving efficiency should be >0.90, got {efficiency:.4f}" - - -def test_efficiency_near_zero_for_circular_driving(): - """Path efficiency should be near 0.0 for full circular motion.""" - env = make_env_with_pos(speed=3.0, original_reward=1.0) + env = MockEnv(speed=3.0, cte=0.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=20) wrapped.reset() - # Drive a full circle (returns to start position) - radius = 1.0 - steps = 25 # More than window_size to fill it - for i in range(steps): - angle = 2 * math.pi * i / 24 # 24 steps = full circle - x = radius * math.cos(angle) - z = radius * math.sin(angle) - env.set_pos([x, 0.0, z]) - wrapped.step(0) + # Simulate full circles (returns to start position) + radius = 0.5 + rewards = [] + for i in range(30): + angle = 2 * math.pi * (i % 20) / 20 + env.set_pos([radius * math.cos(angle), 0., radius * math.sin(angle)]) + _, r, _, _, _ = wrapped.step(0) + rewards.append(r) - efficiency = wrapped._compute_efficiency() - assert efficiency < 0.2, f"Circular driving efficiency should be <0.2, got {efficiency:.4f}" + # After window fills, rewards should be near zero (circling detected) + late_rewards = rewards[20:] + avg = sum(late_rewards) / len(late_rewards) + assert avg < 0.15, f"Circling at CTE=0 should earn near-zero reward, got avg={avg:.4f}" -def test_efficiency_one_with_no_pos_history(): - """When position not available, efficiency should default to 1.0 (no penalty).""" - class NoPosEnv(gym.Env): - metadata = {'render_modes': []} - def __init__(self): - super().__init__() - self.action_space = gym.spaces.Discrete(5) - self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8) - def reset(self, seed=None, **kwargs): - return np.zeros((120, 160, 3), dtype=np.uint8), {} - def step(self, action): - return np.zeros((120, 160, 3), dtype=np.uint8), 0.8, False, False, {'speed': 2.0} # No pos - def close(self): - pass +def test_forward_driving_earns_positive_reward(): + """Straight-line driving at low CTE earns a clear positive reward.""" + env = MockEnv(speed=2.0, cte=0.5) + wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) + wrapped.reset() - wrapped = SpeedRewardWrapper(NoPosEnv(), speed_scale=0.1) + rewards = [] + for i in range(20): + env.set_pos([i * 0.3, 0., 0.]) + _, r, _, _, _ = wrapped.step(0) + rewards.append(r) + + late = rewards[10:] + avg = sum(late) / len(late) + assert avg > 0.5, f"Forward driving should earn >0.5 reward, got {avg:.4f}" + + +def test_forward_beats_circling_by_large_margin(): + """ + Total reward over same number of steps: + forward driving >> circling, even at CTE=0 for the circular car. + """ + env_fwd = MockEnv(speed=2.0, cte=0.5) + env_circ = MockEnv(speed=2.0, cte=0.0) # CTE=0 is best case for circling + + wrapped_fwd = SpeedRewardWrapper(env_fwd, speed_scale=0.1, window_size=20) + wrapped_circ = SpeedRewardWrapper(env_circ, speed_scale=0.1, window_size=20) + wrapped_fwd.reset() + wrapped_circ.reset() + + total_fwd, total_circ = 0.0, 0.0 + radius = 0.5 + for i in range(40): + # Forward: moves in straight line + env_fwd.set_pos([i * 0.3, 0., 0.]) + _, r, _, _, _ = wrapped_fwd.step(0) + total_fwd += r + + # Circular: perfect circles at CTE=0 + angle = 2 * math.pi * (i % 20) / 20 + env_circ.set_pos([radius * math.cos(angle), 0., radius * math.sin(angle)]) + _, r, _, _, _ = wrapped_circ.step(0) + total_circ += r + + assert total_fwd > total_circ * 3, ( + f"Forward ({total_fwd:.1f}) should beat circling ({total_circ:.1f}) by 3x" + ) + + +def test_crash_gives_negative_reward(): + """Episode termination (done=True) must always give -1.0.""" + env = MockEnv(speed=5.0, cte=0.0, done=True) + wrapped = SpeedRewardWrapper(env, speed_scale=0.2) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) - # Without pos, efficiency=1.0, so reward = 0.8 * (1 + 0.1*2*1.0) = 0.96 - assert reward > 0.8, f"Without pos, should get speed bonus (efficiency=1.0), got {reward}" + assert reward == -1.0, f"Crash reward must be -1.0, got {reward}" -def test_efficiency_resets_on_episode_reset(): - """Position history should clear on reset, so each episode starts fresh.""" - env = make_env_with_pos(speed=3.0, original_reward=1.0) +def test_high_cte_reduces_reward(): + """Higher CTE should reduce reward (closer to track edge = lower base).""" + env_low = MockEnv(speed=2.0, cte=0.5) + env_high = MockEnv(speed=2.0, cte=4.0) + + wrapped_low = SpeedRewardWrapper(env_low, speed_scale=0.1, window_size=5) + wrapped_high = SpeedRewardWrapper(env_high, speed_scale=0.1, window_size=5) + wrapped_low.reset() + wrapped_high.reset() + + # Drive straight so efficiency fills up + for i in range(10): + env_low.set_pos([i * 0.3, 0., 0.]) + env_high.set_pos([i * 0.3, 0., 0.]) + _, r_low, _, _, _ = wrapped_low.step(0) + _, r_high, _, _, _ = wrapped_high.step(0) + + assert r_low > r_high, f"Low CTE ({r_low:.3f}) should reward more than high CTE ({r_high:.3f})" + + +def test_speed_bonus_increases_reward_when_on_track(): + """Faster forward driving earns more reward than slower forward driving.""" + env_slow = MockEnv(speed=0.5, cte=1.0) + env_fast = MockEnv(speed=3.0, cte=1.0) + + wrapped_slow = SpeedRewardWrapper(env_slow, speed_scale=0.1, window_size=10) + wrapped_fast = SpeedRewardWrapper(env_fast, speed_scale=0.1, window_size=10) + wrapped_slow.reset() + wrapped_fast.reset() + + for i in range(15): + env_slow.set_pos([i * 0.1, 0., 0.]) + env_fast.set_pos([i * 0.3, 0., 0.]) # Fast car covers more ground + _, r_slow, _, _, _ = wrapped_slow.step(0) + _, r_fast, _, _, _ = wrapped_fast.step(0) + + assert r_fast > r_slow, f"Fast ({r_fast:.3f}) should earn more than slow ({r_slow:.3f})" + + +def test_theoretical_max_per_step(): + """Max reward/step = 1.0 Γ— 1.0 Γ— (1 + scale Γ— max_speed) = 2.0 at scale=0.1, max=10.""" + env = MockEnv() + wrapped = SpeedRewardWrapper(env, speed_scale=0.1) + assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6) + + +def test_4tuple_step_compatibility(): + """Wrapper must handle 4-tuple step() return (old gym API).""" + env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False) + env.set_pos([0., 0., 0.]) + wrapped = SpeedRewardWrapper(env, speed_scale=0.1) + wrapped.reset() + result = wrapped.step(0) + assert len(result) == 4, f"Expected 4-tuple, got {len(result)}" + _, reward, done, info = result + assert isinstance(reward, float) + assert reward != 999.0, "Should not use sim reward" + + +def test_reward_resets_on_episode_reset(): + """After reset, position history clears so efficiency recalculates cleanly.""" + env = MockEnv(speed=2.0, cte=0.5) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() # Fill with circular data - radius = 0.5 for i in range(15): angle = 2 * math.pi * i / 12 - env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)]) + env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) wrapped.step(0) - eff_before_reset = wrapped._compute_efficiency() - - # Reset and drive straight for a few steps + # After reset, start fresh straight wrapped.reset() - for i in range(3): - env.set_pos([i * 0.3, 0.0, 0.0]) - wrapped.step(0) - - eff_after_reset = wrapped._compute_efficiency() - assert eff_after_reset > eff_before_reset, \ - f"After reset, efficiency should improve: before={eff_before_reset:.3f}, after={eff_after_reset:.3f}" - - -def test_speed_bonus_disappears_when_circling(): - """After circling for window_size steps, speed bonus should be nearly zero.""" - env = make_env_with_pos(speed=5.0, original_reward=1.0) - wrapped = SpeedRewardWrapper(env, speed_scale=0.5, window_size=20, min_efficiency=0.05) - wrapped.reset() - - # Warm up with circular motion - radius = 0.5 rewards = [] - for i in range(30): - angle = 2 * math.pi * (i % 20) / 20 # Full circle every 20 steps - env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)]) + for i in range(5): + env.set_pos([i * 0.3, 0., 0.]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) - # Later rewards (after window fills) should be close to original_reward - later_rewards = rewards[20:] - avg_later = sum(later_rewards) / len(later_rewards) - assert avg_later < 1.3, \ - f"Circular driving speed bonus should be suppressed, avg reward={avg_later:.3f} (original=1.0)" - - -# ---- Inherited guarantees ---- - -def test_crash_still_penalized(): - """Crash (original_reward=-1) should remain -1 regardless of speed or efficiency.""" - env = make_env_with_pos(speed=8.0, original_reward=-1.0, done=True) - wrapped = SpeedRewardWrapper(env, speed_scale=0.2) - wrapped.reset() - _, reward, _, _, _ = wrapped.step(0) - assert reward == -1.0, f"Crash reward should remain -1.0, got {reward}" - - -def test_theoretical_max_per_step(): - """Max reward/step bounded: original(1.0) Γ— (1 + speed_scale Γ— max_speed).""" - env = make_env_with_pos() - wrapped = SpeedRewardWrapper(env, speed_scale=0.1) - assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6) + # Should get reasonable reward after fresh start + assert rewards[-1] > 0, "Should get positive reward after reset and straight driving"