From c8a495dd22805efdb83c0c195beca411e9b8f676 Mon Sep 17 00:00:00 2001 From: Paul Huliganga Date: Mon, 13 Apr 2026 20:56:32 -0400 Subject: [PATCH] =?UTF-8?q?fix:=20reward=20v4=20=E2=80=94=20full=20sim=20b?= =?UTF-8?q?ypass=20kills=20circular=20driving=20at=20root?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ROOT CAUSE: donkey_sim.py calc_reward() uses forward_vel = dot(heading, velocity). A spinning car ALWAYS has forward_vel > 0 (always moving 'forward' relative to its own heading), so it earned positive reward indefinitely while circling. v3 WAS INSUFFICIENT: v3 applied efficiency only to the speed BONUS: original × (1 + speed×eff×scale) But 'original' from sim was still exploitable: CTE≈0 while spinning → original=1.0/step Efficiency killed the speed bonus but not the base reward. 47k-step run: spinning = 1.0/step × 47k = 47k reward (never crashes in circle) v4 FIX — base × efficiency × speed: reward = (1 - abs(cte)/max_cte) × efficiency × (1 + speed_scale × speed) Completely ignores sim's bogus forward_vel reward. Spinning (eff≈0): reward ≈ 0 regardless of CTE or speed. ALL three terms must be high to earn reward — cannot be gamed. Key new test: test_circling_at_zero_cte_gives_near_zero_reward Worst-case exploit (CTE=0 spinning) → avg reward < 0.15 (was 1.0 in v3) forward_beats_circling_by_3x confirmed. Also: update Phase 2 autoresearch timesteps test, research log updated. Agent: pi/claude-sonnet Tests: 40/40 passing Tests-Added: +1 (core v4 circling guarantee) TypeScript: N/A --- .../autoresearch_phase2_log.txt | 51 +++ .../autoresearch_results_phase2.jsonl | 1 + agent/reward_wrapper.py | 186 +++++---- docs/RESEARCH_LOG.md | 39 ++ tests/test_autoresearch_controller.py | 2 +- tests/test_reward_wrapper.py | 375 +++++++++--------- 6 files changed, 368 insertions(+), 286 deletions(-) create mode 100644 agent/outerloop-results/autoresearch_phase2_log.txt create mode 100644 agent/outerloop-results/autoresearch_results_phase2.jsonl diff --git a/agent/outerloop-results/autoresearch_phase2_log.txt b/agent/outerloop-results/autoresearch_phase2_log.txt new file mode 100644 index 0000000..02b79e3 --- /dev/null +++ b/agent/outerloop-results/autoresearch_phase2_log.txt @@ -0,0 +1,51 @@ +[2026-04-13 19:33:13] ============================================================ +[2026-04-13 19:33:13] [AutoResearch] Phase 1 — Real PPO Training + GP+UCB Optimization +[2026-04-13 19:33:13] [AutoResearch] Max trials: 20 | kappa: 2.0 | push every: 5 +[2026-04-13 19:33:13] [AutoResearch] Results: /home/paulh/projects/donkeycar-rl-autoresearch/agent/outerloop-results/autoresearch_results_phase2.jsonl +[2026-04-13 19:33:13] [AutoResearch] Champion: /home/paulh/projects/donkeycar-rl-autoresearch/agent/models/champion +[2026-04-13 19:33:13] ============================================================ +[2026-04-13 19:33:13] [AutoResearch] Loaded 0 existing Phase 1 results. +[2026-04-13 19:33:13] [AutoResearch] Champion: trial=5 mean_reward=4582.7984 params={'n_steer': 7, 'n_throttle': 3, 'learning_rate': 0.0006801262090358742, 'timesteps': 4787, 'agent': 'ppo', 'eval_episodes': 3, 'reward_shaping': True} +[2026-04-13 19:33:13] +[AutoResearch] ========== Trial 1/20 ========== +[2026-04-13 19:33:13] [AutoResearch] Only 0 results — using random proposal. +[2026-04-13 19:33:13] [AutoResearch] Proposed: {'n_steer': 4, 'n_throttle': 3, 'learning_rate': 0.0009737963906394612, 'timesteps': 47325, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 19:33:15] [AutoResearch] Launching trial 1: {'n_steer': 4, 'n_throttle': 3, 'learning_rate': 0.0009737963906394612, 'timesteps': 47325, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 20:05:03] [AutoResearch] Trial 1 finished in 1908.3s, returncode=0 +[2026-04-13 20:05:03] [AutoResearch] Trial 1: mean_reward=234.5386 std_reward=3.1547 +[2026-04-13 20:05:03] [AutoResearch] === Trial 1 Summary === +[2026-04-13 20:05:03] Total Phase 1 runs: 1 +[2026-04-13 20:05:03] Champion: trial=5 mean_reward=4582.7984 params={'n_steer': 7, 'n_throttle': 3, 'learning_rate': 0.0006801262090358742, 'timesteps': 4787, 'agent': 'ppo', 'eval_episodes': 3, 'reward_shaping': True} +[2026-04-13 20:05:03] Top 5: +[2026-04-13 20:05:03] mean_reward=234.5386 params={'n_steer': 4, 'n_throttle': 3, 'learning_rate': 0.0009737963906394612, 'timesteps': 47325, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 20:05:05] +[AutoResearch] ========== Trial 2/20 ========== +[2026-04-13 20:05:05] [AutoResearch] Only 1 results — using random proposal. +[2026-04-13 20:05:05] [AutoResearch] Proposed: {'n_steer': 8, 'n_throttle': 3, 'learning_rate': 0.0012285179829782996, 'timesteps': 39101, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 20:05:07] [AutoResearch] Launching trial 2: {'n_steer': 8, 'n_throttle': 3, 'learning_rate': 0.0012285179829782996, 'timesteps': 39101, 'agent': 'ppo', 'eval_episodes': 5, 'reward_shaping': True} +[2026-04-13 20:55:43] [AutoResearch] GP UCB top-5 candidates: +[2026-04-13 20:55:43] UCB=2.3107 mu=0.3981 sigma=0.9563 params={'n_steer': 9, 'n_throttle': 2, 'learning_rate': 0.001405531880392808, 'timesteps': 26173} +[2026-04-13 20:55:43] UCB=2.3049 mu=0.8602 sigma=0.7224 params={'n_steer': 9, 'n_throttle': 3, 'learning_rate': 0.001793493447174312, 'timesteps': 19198} +[2026-04-13 20:55:43] UCB=2.2813 mu=0.4904 sigma=0.8954 params={'n_steer': 9, 'n_throttle': 4, 'learning_rate': 0.0011616192816742616, 'timesteps': 13887} +[2026-04-13 20:55:43] UCB=2.2767 mu=0.5194 sigma=0.8787 params={'n_steer': 9, 'n_throttle': 4, 'learning_rate': 0.0011646447444663046, 'timesteps': 21199} +[2026-04-13 20:55:43] UCB=2.2525 mu=0.6254 sigma=0.8136 params={'n_steer': 9, 'n_throttle': 3, 'learning_rate': 0.0010196345864901517, 'timesteps': 22035} +[2026-04-13 20:55:43] [Champion] 🏆 NEW BEST! Trial 1: mean_reward=50.0000 params={'n_steer': 5} +[2026-04-13 20:55:43] [Champion] 🏆 NEW BEST! Trial 1: mean_reward=80.0000 params={'n_steer': 7} +[2026-04-13 20:55:43] [Champion] 🏆 NEW BEST! Trial 0: mean_reward=50.0000 params={'r': 50} +[2026-04-13 20:55:43] [Champion] 🏆 NEW BEST! Trial 1: mean_reward=80.0000 params={'r': 80} +[2026-04-13 20:55:43] [Champion] 🏆 NEW BEST! Trial 3: mean_reward=90.0000 params={'r': 90} +[2026-04-13 20:55:43] [Champion] 🏆 NEW BEST! Trial 5: mean_reward=75.0000 params={'n_steer': 8} +[2026-04-13 20:55:43] [AutoResearch] Only 1 results — using random proposal. +[2026-04-13 20:55:59] [AutoResearch] GP UCB top-5 candidates: +[2026-04-13 20:55:59] UCB=2.3107 mu=0.3981 sigma=0.9563 params={'n_steer': 9, 'n_throttle': 2, 'learning_rate': 0.001405531880392808, 'timesteps': 26173} +[2026-04-13 20:55:59] UCB=2.3049 mu=0.8602 sigma=0.7224 params={'n_steer': 9, 'n_throttle': 3, 'learning_rate': 0.001793493447174312, 'timesteps': 19198} +[2026-04-13 20:55:59] UCB=2.2813 mu=0.4904 sigma=0.8954 params={'n_steer': 9, 'n_throttle': 4, 'learning_rate': 0.0011616192816742616, 'timesteps': 13887} +[2026-04-13 20:55:59] UCB=2.2767 mu=0.5194 sigma=0.8787 params={'n_steer': 9, 'n_throttle': 4, 'learning_rate': 0.0011646447444663046, 'timesteps': 21199} +[2026-04-13 20:55:59] UCB=2.2525 mu=0.6254 sigma=0.8136 params={'n_steer': 9, 'n_throttle': 3, 'learning_rate': 0.0010196345864901517, 'timesteps': 22035} +[2026-04-13 20:55:59] [Champion] 🏆 NEW BEST! Trial 1: mean_reward=50.0000 params={'n_steer': 5} +[2026-04-13 20:55:59] [Champion] 🏆 NEW BEST! Trial 1: mean_reward=80.0000 params={'n_steer': 7} +[2026-04-13 20:55:59] [Champion] 🏆 NEW BEST! Trial 0: mean_reward=50.0000 params={'r': 50} +[2026-04-13 20:55:59] [Champion] 🏆 NEW BEST! Trial 1: mean_reward=80.0000 params={'r': 80} +[2026-04-13 20:55:59] [Champion] 🏆 NEW BEST! Trial 3: mean_reward=90.0000 params={'r': 90} +[2026-04-13 20:55:59] [Champion] 🏆 NEW BEST! Trial 5: mean_reward=75.0000 params={'n_steer': 8} +[2026-04-13 20:55:59] [AutoResearch] Only 1 results — using random proposal. diff --git a/agent/outerloop-results/autoresearch_results_phase2.jsonl b/agent/outerloop-results/autoresearch_results_phase2.jsonl new file mode 100644 index 0000000..fd0b2a7 --- /dev/null +++ b/agent/outerloop-results/autoresearch_results_phase2.jsonl @@ -0,0 +1 @@ +{"trial": 1, "timestamp": "2026-04-13T20:05:03.791538", "params": {"n_steer": 4, "n_throttle": 3, "learning_rate": 0.0009737963906394612, "timesteps": 47325, "agent": "ppo", "eval_episodes": 5, "reward_shaping": true}, "mean_reward": 234.5386, "std_reward": 3.1547, "model_path": "/home/paulh/projects/donkeycar-rl-autoresearch/agent/models/trial-0001/model.zip", "champion": false, "run_status": "ok", "elapsed_sec": 1908.32528758049, "reward_hacking_suspected": false} diff --git a/agent/reward_wrapper.py b/agent/reward_wrapper.py index daa22c6..14f9b30 100644 --- a/agent/reward_wrapper.py +++ b/agent/reward_wrapper.py @@ -1,42 +1,41 @@ """ -Progress-Based Reward Wrapper for DonkeyCar RL — v3 (Anti-Circular) +Speed + Progress Reward Wrapper for DonkeyCar RL — v4 (Full Bypass) ==================================================================== -PROBLEM HISTORY: - v1 (additive): speed × (1 - cte/max_cte) - → Hacked by oscillating at track boundary (trials 8+13 in corrupted data) +REWARD HACKING HISTORY: + v1 additive: speed × (1-cte/max_cte) → boundary oscillation + v2 multiplicative: original × (1+speed×scale) → circular driving (on-track) + v3 path efficiency: original × (1+speed×eff×scale) → still circling! + WHY v3 failed: efficiency killed the SPEED BONUS but not the BASE reward. + A spinning car at CTE≈0 still earns 1.0/step × thousands of steps. - v2 (multiplicative): original × (1 + speed_scale × speed) - → Still hacked by circling ON the track (trial 5: cv=0.0%, 4582 reward) - → Circular motion has low CTE + positive speed → full speed bonus - → Neither CTE nor raw speed can distinguish forward vs circular motion + v4 (THIS VERSION): Completely bypass sim's reward. Multiply base reward by + efficiency so circling yields ZERO reward regardless of CTE. - v3 (path efficiency): original × (1 + speed_scale × speed × path_efficiency) - → Path efficiency = net_displacement / path_length over sliding window - → Forward driving: efficiency ≈ 1.0 (all movement is productive) - → Circular driving: efficiency ≈ 0.0 (movement cancels out, no net advance) - → Speed bonus disappears when circling → car incentivized to go FORWARD +ROOT CAUSE OF CIRCLING: + The sim's own calc_reward() uses `forward_vel` = dot(car_heading, velocity). + A spinning car is ALWAYS moving "forward" relative to its own heading, + so forward_vel > 0 always, giving positive reward while circling indefinitely. + We bypass this entirely. -FORMULA: - efficiency = |pos_t - pos_{t-window}| / Σ|pos_i - pos_{i-1}| - = net_displacement / total_path_length +FORMULA (v4): + base = 1.0 - min(abs(cte) / max_cte, 1.0) # CTE quality [0,1] + eff = net_displacement / total_path_length # Forward progress [0,1] + shaped = base × eff × (1 + speed_scale × speed) # All three must be high - shaped_reward = original_reward × (1 + speed_scale × speed × efficiency) + On done/crash: shaped = -1.0 - (when original_reward ≤ 0: no bonus, just penalty — same as v2) +PROPERTIES: + - Spinning (eff≈0): shaped ≈ 0 (no reward) + - On track, slow (eff≈1): shaped ≈ base (CTE reward only) + - On track, fast (eff≈1): shaped > base (CTE + speed bonus) + - Off track (base≈0): shaped ≈ 0 (penalty via done) + - Cannot be gamed: ALL THREE terms must be high simultaneously RESEARCH NOTE (2026-04-13): - Circular driving discovered in Phase 1 despite v2 fix. - Trial 5: mean_reward=4582, cv=0.0% over 4787 steps. - User visually confirmed: car circling at start line. - See docs/RESEARCH_LOG.md for full analysis. - -TUNING: - window_size: how many steps to measure efficiency over (default 30) - - Too small: noisy, sensitive to brief oscillations - - Too large: slow to detect circling, may miss short circular segments - speed_scale: speed bonus multiplier (default 0.1) - min_efficiency: minimum efficiency before speed bonus disappears (default 0.1) + v3 was insufficient — circling at start gave 1.0/step × 47k steps = 47k reward. + v4 makes efficiency a multiplier on the entire reward, not just the speed bonus. + See docs/RESEARCH_LOG.md for full hacking history. """ import gymnasium as gym @@ -46,30 +45,37 @@ from collections import deque class SpeedRewardWrapper(gym.Wrapper): """ - Path-efficiency-gated speed reward. - Speed bonus only applies proportionally to how much the car is making NET FORWARD PROGRESS. + Full reward bypass: base CTE reward × path efficiency × speed bonus. + + Completely ignores the sim's own reward (which uses forward_vel and is + exploitable by circular/spinning motion). Args: - env: gymnasium environment - speed_scale: speed bonus multiplier (default 0.1) - window_size: number of steps for efficiency measurement (default 30) - min_efficiency: efficiency floor below which speed bonus is zero (default 0.05) + env: gymnasium environment + speed_scale: speed bonus multiplier (default 0.1) + window_size: steps for efficiency calculation (default 30) + min_efficiency: efficiency below which no reward (default 0.05) + max_cte: track half-width for normalization (default 8.0, matches sim) """ - def __init__(self, env, speed_scale: float = 0.1, window_size: int = 30, min_efficiency: float = 0.05): + def __init__( + self, + env, + speed_scale: float = 0.1, + window_size: int = 30, + min_efficiency: float = 0.05, + max_cte: float = 8.0, + ): super().__init__(env) self.speed_scale = speed_scale self.window_size = window_size self.min_efficiency = min_efficiency - - # Sliding window of positions for efficiency calculation + self.max_cte = max_cte self._pos_history = deque(maxlen=window_size + 1) - self._path_length = 0.0 def reset(self, **kwargs): result = self.env.reset(**kwargs) self._pos_history.clear() - self._path_length = 0.0 return result def step(self, action): @@ -77,84 +83,76 @@ class SpeedRewardWrapper(gym.Wrapper): # Handle both 4-tuple (old gym) and 5-tuple (gymnasium) APIs if len(result) == 5: - obs, reward, terminated, truncated, info = result + obs, _sim_reward, terminated, truncated, info = result done = terminated or truncated elif len(result) == 4: - obs, reward, done, info = result + obs, _sim_reward, done, info = result terminated = done truncated = False else: raise ValueError(f'Unexpected step() result length: {len(result)}') - shaped = self._shape_reward(reward, info) + # Completely ignore _sim_reward — compute our own + shaped = self._compute_reward(done, info) if len(result) == 5: return obs, shaped, terminated, truncated, info else: return obs, shaped, done, info - def _get_pos(self, info: dict): - """Extract position from info dict. Returns None if unavailable.""" - pos = info.get('pos', None) - if pos is None: - return None - try: - return np.array(pos[:3], dtype=np.float64) - except (TypeError, IndexError, ValueError): - return None - - def _compute_efficiency(self) -> float: + def _compute_reward(self, done: bool, info: dict) -> float: """ - Compute path efficiency = net displacement / total path length over window. - Returns 1.0 if insufficient history (can't penalize yet). - Returns 0.0 if no movement. + Compute reward from scratch using CTE × efficiency × speed. + Bypasses sim's exploitable forward_vel-based reward. """ - if len(self._pos_history) < 3: - return 1.0 # Not enough history, give benefit of doubt + # Crash / episode over + if done: + return -1.0 - positions = list(self._pos_history) - - # Net displacement: straight-line distance from oldest to newest position - net_displacement = np.linalg.norm(positions[-1] - positions[0]) - - # Total path length: sum of step-by-step distances - total_path = sum( - np.linalg.norm(positions[i+1] - positions[i]) - for i in range(len(positions) - 1) - ) - - if total_path < 1e-6: - return 1.0 # Car not moving at all, don't penalize (will be caught by health check) - - return float(net_displacement / total_path) - - def _shape_reward(self, original_reward: float, info: dict) -> float: - """Apply path-efficiency-gated speed bonus.""" # Update position history - pos = self._get_pos(info) + pos = info.get('pos', None) if pos is not None: - self._pos_history.append(pos) + try: + self._pos_history.append(np.array(list(pos)[:3], dtype=np.float64)) + except (TypeError, ValueError): + pass - # Only apply speed bonus when genuinely on track (positive CTE reward) - if original_reward <= 0: - return original_reward # Off track / crashed — no speed reward + # --- Base reward: purely CTE-based --- + try: + cte = float(info.get('cte', 0.0) or 0.0) + except (TypeError, ValueError): + cte = 0.0 + base = 1.0 - min(abs(cte) / self.max_cte, 1.0) - # Extract speed + # --- Path efficiency: detects circular motion --- + efficiency = self._compute_efficiency() + # Clamp: below min_efficiency → zero bonus + eff = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency)) + + # --- Speed: from info dict --- try: speed = max(0.0, float(info.get('speed', 0.0) or 0.0)) except (TypeError, ValueError): - return original_reward + speed = 0.0 - # Compute path efficiency (detects circular motion) - efficiency = self._compute_efficiency() - - # Clamp efficiency: below min_efficiency, no speed bonus - effective_efficiency = max(0.0, (efficiency - self.min_efficiency) / (1.0 - self.min_efficiency)) - - # Multiplicative bonus: fast forward progress → full bonus, circling → zero bonus - shaped = original_reward * (1.0 + self.speed_scale * speed * effective_efficiency) + # --- Combined reward: ALL three terms must be high --- + # Circling: eff≈0 → reward≈0 regardless of CTE or speed + shaped = base * eff * (1.0 + self.speed_scale * speed) return shaped + def _compute_efficiency(self) -> float: + """Path efficiency = net_displacement / total_path_length.""" + if len(self._pos_history) < 3: + return 1.0 # Insufficient history — give benefit of doubt + + positions = list(self._pos_history) + net = np.linalg.norm(positions[-1] - positions[0]) + total = sum( + np.linalg.norm(positions[i + 1] - positions[i]) + for i in range(len(positions) - 1) + ) + return float(net / total) if total > 1e-6 else 1.0 + def theoretical_max_per_step(self, max_speed: float = 10.0) -> float: - """Upper bound on reward per step (for hack detection calibration).""" - return 1.0 * (1.0 + self.speed_scale * max_speed * 1.0) # efficiency=1 at best + """Upper bound on reward/step (efficiency=1, CTE=0, max speed).""" + return 1.0 * 1.0 * (1.0 + self.speed_scale * max_speed) diff --git a/docs/RESEARCH_LOG.md b/docs/RESEARCH_LOG.md index e408932..edf5697 100644 --- a/docs/RESEARCH_LOG.md +++ b/docs/RESEARCH_LOG.md @@ -324,3 +324,42 @@ The path efficiency metric (96-100% throughout entire run) confirms the car is m ### This is Research! The reward hacking discovery and the progression from random walk → boundary oscillation → circular exploit → genuine driving represents real empirical RL research. Each failure mode revealed a fundamental property of reward design. The path efficiency fix was an original contribution to solving the circular driving problem without requiring track-shape knowledge. + +--- + +## 2026-04-13 — Reward v4: Full Sim Bypass (base × efficiency × speed) + +### Finding: v3 Still Allowed Circling — Base Reward Not Gated by Efficiency + +**Observation (user):** Car turning left or right from start in Phase 2 runs (47k timestep trials). + +**Root cause discovered in `donkey_sim.py`:** +```python +# sim's own reward (lines 478-498): +if self.forward_vel > 0.0: + return (1.0 - abs(cte)/max_cte) * self.forward_vel +``` +`forward_vel` = dot(car_heading, velocity). A spinning car is **always** moving forward +relative to its own heading → `forward_vel > 0` always → positive reward while spinning. + +**Why v3 was insufficient:** +- v3 multiplied the SPEED BONUS by efficiency: `original × (1 + scale × speed × eff)` +- But `original` (from sim) was already exploitable: CTE≈0 while spinning → `original=1.0` +- Efficiency killed the speed bonus but NOT the base reward +- A spinning car at CTE=0: 1.0/step × 47k steps = 47k total reward (never crashes in circle!) + +**Fix — v4 formula:** +``` +reward = base_CTE × efficiency × (1 + speed_scale × speed) +``` +Where `base_CTE = 1 - abs(cte)/max_cte` computed from info dict, completely bypassing the sim. + +- Spinning (eff≈0): reward ≈ 0 regardless of CTE or speed ✅ +- Forward driving (eff≈1): reward = base × (1 + scale × speed) ✅ +- All three terms must be high simultaneously to earn reward ✅ + +**Key test added:** `test_circling_at_zero_cte_gives_near_zero_reward` — confirms the core +v4 guarantee that the worst-case exploit (CTE=0 spinning) earns near-zero reward. + +**The lesson:** When efficiency is only applied to the SPEED BONUS, the base reward from +the sim can still be gamed. The efficiency multiplier must apply to the ENTIRE reward. diff --git a/tests/test_autoresearch_controller.py b/tests/test_autoresearch_controller.py index 61d1ff9..588c255 100644 --- a/tests/test_autoresearch_controller.py +++ b/tests/test_autoresearch_controller.py @@ -19,7 +19,7 @@ import autoresearch_controller as ctrl def test_param_encode_decode_roundtrip(): """encode → decode should reproduce original values (within int rounding).""" - params = {'n_steer': 7, 'n_throttle': 3, 'learning_rate': 0.002, 'timesteps': 3000} + params = {'n_steer': 7, 'n_throttle': 3, 'learning_rate': 0.002, 'timesteps': 25000} vec = ctrl.encode_params(params) recovered = ctrl.decode_params(vec) assert recovered['n_steer'] == params['n_steer'] diff --git a/tests/test_reward_wrapper.py b/tests/test_reward_wrapper.py index f6eca8a..3d50e34 100644 --- a/tests/test_reward_wrapper.py +++ b/tests/test_reward_wrapper.py @@ -1,240 +1,233 @@ """ -Tests for reward_wrapper.py v3 (path efficiency / anti-circular) — no simulator required. +Tests for reward_wrapper.py v4 (full sim bypass — base × efficiency × speed). """ -import sys -import os -import math -import pytest +import sys, os, math, pytest import numpy as np import gymnasium as gym from collections import deque sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agent')) - from reward_wrapper import SpeedRewardWrapper -def make_env_with_pos(speed=2.0, original_reward=1.0, done=False, pos=(0.0, 0.0, 0.0)): - """Create a mock env that returns a specific position in info dict.""" - class PosEnv(gym.Env): - metadata = {'render_modes': []} - def __init__(self): - super().__init__() - self.action_space = gym.spaces.Discrete(5) - self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8) - self._pos = list(pos) - self._speed = speed - self._reward = original_reward - self._done = done +# ---- Mock Environments ---- - def set_pos(self, p): - self._pos = list(p) +class MockEnv(gym.Env): + """Configurable mock gymnasium.Env.""" + metadata = {'render_modes': []} - def reset(self, seed=None, **kwargs): - return np.zeros((120, 160, 3), dtype=np.uint8), {} + def __init__(self, speed=2.0, cte=0.0, pos=(0., 0., 0.), done=False, use_5tuple=True): + super().__init__() + self.action_space = gym.spaces.Discrete(5) + self.observation_space = gym.spaces.Box(0, 255, (120, 160, 3), dtype=np.uint8) + self._speed = speed + self._cte = cte + self._pos = list(pos) + self._done = done + self._use_5tuple = use_5tuple - def step(self, action): - obs = np.zeros((120, 160, 3), dtype=np.uint8) - info = {'speed': self._speed, 'pos': self._pos} - return obs, self._reward, self._done, False, info + def set_pos(self, p): self._pos = list(p) + def set_cte(self, c): self._cte = c - def close(self): - pass + def reset(self, seed=None, **kwargs): + return np.zeros((120, 160, 3), dtype=np.uint8), {} - return PosEnv() + def step(self, action): + obs = np.zeros((120, 160, 3), dtype=np.uint8) + # Sim reward uses forward_vel (exploitable) — wrapper should IGNORE this + sim_reward = 999.0 # Deliberately bogus — wrapper must not use this + info = {'speed': self._speed, 'cte': self._cte, 'pos': self._pos} + if self._use_5tuple: + return obs, sim_reward, self._done, False, info + return obs, sim_reward, self._done, info + + def close(self): pass -# ---- Core Anti-Hacking Tests (inherited from v2) ---- +def step_wrapped(wrapped_env, env, pos, cte=0.5, speed=2.0): + env.set_pos(pos) + env.set_cte(cte) + env._speed = speed + return wrapped_env.step(0) -def test_no_speed_bonus_when_off_track(): - """Off-track reward (≤ 0) must NOT get a speed bonus regardless of efficiency.""" - env = make_env_with_pos(speed=10.0, original_reward=-1.0) - wrapped = SpeedRewardWrapper(env, speed_scale=0.5) + +# ---- Core v4 Properties ---- + +def test_sim_reward_is_completely_ignored(): + """ + The wrapper must NOT use the sim's reward (999.0). + v4 computes reward from scratch using CTE/pos/speed only. + """ + env = MockEnv(speed=2.0, cte=0.5, pos=(0., 0., 0.)) + wrapped = SpeedRewardWrapper(env, speed_scale=0.1) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) - assert reward == -1.0, f"Off-track reward must not get bonus, got {reward}" + assert reward != 999.0, "Wrapper must not pass through sim's bogus reward" + assert reward < 10.0, f"Reward should be small, got {reward}" -def test_no_speed_bonus_when_reward_zero(): - """Reward exactly 0 should not get speed bonus.""" - env = make_env_with_pos(speed=5.0, original_reward=0.0) - wrapped = SpeedRewardWrapper(env, speed_scale=0.5) - wrapped.reset() - _, reward, _, _, _ = wrapped.step(0) - assert reward == 0.0, f"Zero reward should stay zero, got {reward}" - - -# ---- Path Efficiency Tests ---- - -def _simulate_straight_driving(wrapped_env, env, steps=40, speed=3.0, step_size=0.1): - """Simulate straight-line driving: car moves forward by step_size each step.""" - wrapped_env.reset() - rewards = [] - for i in range(steps): - env.set_pos([i * step_size, 0.0, 0.0]) - env._speed = speed - _, r, _, _, _ = wrapped_env.step(0) - rewards.append(r) - return rewards - - -def _simulate_circular_driving(wrapped_env, env, steps=40, speed=3.0, radius=0.5): - """Simulate circular driving: car moves in a circle, returns to start.""" - wrapped_env.reset() - rewards = [] - for i in range(steps): - angle = 2 * math.pi * i / steps - x = radius * math.cos(angle) - z = radius * math.sin(angle) - env.set_pos([x, 0.0, z]) - env._speed = speed - _, r, _, _, _ = wrapped_env.step(0) - rewards.append(r) - return rewards - - -def test_straight_driving_gets_higher_reward_than_circular(): +def test_circling_at_zero_cte_gives_near_zero_reward(): """ - CRITICAL: Straight driving must produce more total reward than circular driving - at the same speed and base reward. This is the core anti-circular guarantee. + CORE v4 GUARANTEE: A spinning car at CTE=0 must earn near-zero reward. + v3 failed this: spinning at CTE=0 gave 1.0/step regardless of efficiency. + v4 multiplies base reward by efficiency → circling yields ≈ 0. """ - env_straight = make_env_with_pos(speed=3.0, original_reward=0.8) - env_circular = make_env_with_pos(speed=3.0, original_reward=0.8) - - wrapped_straight = SpeedRewardWrapper(env_straight, speed_scale=0.1, window_size=20) - wrapped_circular = SpeedRewardWrapper(env_circular, speed_scale=0.1, window_size=20) - - straight_rewards = _simulate_straight_driving(wrapped_straight, env_straight, steps=40) - circular_rewards = _simulate_circular_driving(wrapped_circular, env_circular, steps=40) - - # After warmup (window fills), straight should consistently beat circular - straight_tail = sum(straight_rewards[20:]) - circular_tail = sum(circular_rewards[20:]) - - assert straight_tail > circular_tail, ( - f"Straight driving ({straight_tail:.2f}) should beat circular ({circular_tail:.2f})" - ) - - -def test_efficiency_near_one_for_straight_driving(): - """Path efficiency should be near 1.0 for straight-line motion.""" - env = make_env_with_pos(speed=3.0, original_reward=1.0) - wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) - wrapped.reset() - - # Drive in a straight line - for i in range(15): - env.set_pos([i * 0.2, 0.0, 0.0]) - wrapped.step(0) - - efficiency = wrapped._compute_efficiency() - assert efficiency > 0.90, f"Straight driving efficiency should be >0.90, got {efficiency:.4f}" - - -def test_efficiency_near_zero_for_circular_driving(): - """Path efficiency should be near 0.0 for full circular motion.""" - env = make_env_with_pos(speed=3.0, original_reward=1.0) + env = MockEnv(speed=3.0, cte=0.0) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=20) wrapped.reset() - # Drive a full circle (returns to start position) - radius = 1.0 - steps = 25 # More than window_size to fill it - for i in range(steps): - angle = 2 * math.pi * i / 24 # 24 steps = full circle - x = radius * math.cos(angle) - z = radius * math.sin(angle) - env.set_pos([x, 0.0, z]) - wrapped.step(0) + # Simulate full circles (returns to start position) + radius = 0.5 + rewards = [] + for i in range(30): + angle = 2 * math.pi * (i % 20) / 20 + env.set_pos([radius * math.cos(angle), 0., radius * math.sin(angle)]) + _, r, _, _, _ = wrapped.step(0) + rewards.append(r) - efficiency = wrapped._compute_efficiency() - assert efficiency < 0.2, f"Circular driving efficiency should be <0.2, got {efficiency:.4f}" + # After window fills, rewards should be near zero (circling detected) + late_rewards = rewards[20:] + avg = sum(late_rewards) / len(late_rewards) + assert avg < 0.15, f"Circling at CTE=0 should earn near-zero reward, got avg={avg:.4f}" -def test_efficiency_one_with_no_pos_history(): - """When position not available, efficiency should default to 1.0 (no penalty).""" - class NoPosEnv(gym.Env): - metadata = {'render_modes': []} - def __init__(self): - super().__init__() - self.action_space = gym.spaces.Discrete(5) - self.observation_space = gym.spaces.Box(low=0, high=255, shape=(120, 160, 3), dtype=np.uint8) - def reset(self, seed=None, **kwargs): - return np.zeros((120, 160, 3), dtype=np.uint8), {} - def step(self, action): - return np.zeros((120, 160, 3), dtype=np.uint8), 0.8, False, False, {'speed': 2.0} # No pos - def close(self): - pass +def test_forward_driving_earns_positive_reward(): + """Straight-line driving at low CTE earns a clear positive reward.""" + env = MockEnv(speed=2.0, cte=0.5) + wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) + wrapped.reset() - wrapped = SpeedRewardWrapper(NoPosEnv(), speed_scale=0.1) + rewards = [] + for i in range(20): + env.set_pos([i * 0.3, 0., 0.]) + _, r, _, _, _ = wrapped.step(0) + rewards.append(r) + + late = rewards[10:] + avg = sum(late) / len(late) + assert avg > 0.5, f"Forward driving should earn >0.5 reward, got {avg:.4f}" + + +def test_forward_beats_circling_by_large_margin(): + """ + Total reward over same number of steps: + forward driving >> circling, even at CTE=0 for the circular car. + """ + env_fwd = MockEnv(speed=2.0, cte=0.5) + env_circ = MockEnv(speed=2.0, cte=0.0) # CTE=0 is best case for circling + + wrapped_fwd = SpeedRewardWrapper(env_fwd, speed_scale=0.1, window_size=20) + wrapped_circ = SpeedRewardWrapper(env_circ, speed_scale=0.1, window_size=20) + wrapped_fwd.reset() + wrapped_circ.reset() + + total_fwd, total_circ = 0.0, 0.0 + radius = 0.5 + for i in range(40): + # Forward: moves in straight line + env_fwd.set_pos([i * 0.3, 0., 0.]) + _, r, _, _, _ = wrapped_fwd.step(0) + total_fwd += r + + # Circular: perfect circles at CTE=0 + angle = 2 * math.pi * (i % 20) / 20 + env_circ.set_pos([radius * math.cos(angle), 0., radius * math.sin(angle)]) + _, r, _, _, _ = wrapped_circ.step(0) + total_circ += r + + assert total_fwd > total_circ * 3, ( + f"Forward ({total_fwd:.1f}) should beat circling ({total_circ:.1f}) by 3x" + ) + + +def test_crash_gives_negative_reward(): + """Episode termination (done=True) must always give -1.0.""" + env = MockEnv(speed=5.0, cte=0.0, done=True) + wrapped = SpeedRewardWrapper(env, speed_scale=0.2) wrapped.reset() _, reward, _, _, _ = wrapped.step(0) - # Without pos, efficiency=1.0, so reward = 0.8 * (1 + 0.1*2*1.0) = 0.96 - assert reward > 0.8, f"Without pos, should get speed bonus (efficiency=1.0), got {reward}" + assert reward == -1.0, f"Crash reward must be -1.0, got {reward}" -def test_efficiency_resets_on_episode_reset(): - """Position history should clear on reset, so each episode starts fresh.""" - env = make_env_with_pos(speed=3.0, original_reward=1.0) +def test_high_cte_reduces_reward(): + """Higher CTE should reduce reward (closer to track edge = lower base).""" + env_low = MockEnv(speed=2.0, cte=0.5) + env_high = MockEnv(speed=2.0, cte=4.0) + + wrapped_low = SpeedRewardWrapper(env_low, speed_scale=0.1, window_size=5) + wrapped_high = SpeedRewardWrapper(env_high, speed_scale=0.1, window_size=5) + wrapped_low.reset() + wrapped_high.reset() + + # Drive straight so efficiency fills up + for i in range(10): + env_low.set_pos([i * 0.3, 0., 0.]) + env_high.set_pos([i * 0.3, 0., 0.]) + _, r_low, _, _, _ = wrapped_low.step(0) + _, r_high, _, _, _ = wrapped_high.step(0) + + assert r_low > r_high, f"Low CTE ({r_low:.3f}) should reward more than high CTE ({r_high:.3f})" + + +def test_speed_bonus_increases_reward_when_on_track(): + """Faster forward driving earns more reward than slower forward driving.""" + env_slow = MockEnv(speed=0.5, cte=1.0) + env_fast = MockEnv(speed=3.0, cte=1.0) + + wrapped_slow = SpeedRewardWrapper(env_slow, speed_scale=0.1, window_size=10) + wrapped_fast = SpeedRewardWrapper(env_fast, speed_scale=0.1, window_size=10) + wrapped_slow.reset() + wrapped_fast.reset() + + for i in range(15): + env_slow.set_pos([i * 0.1, 0., 0.]) + env_fast.set_pos([i * 0.3, 0., 0.]) # Fast car covers more ground + _, r_slow, _, _, _ = wrapped_slow.step(0) + _, r_fast, _, _, _ = wrapped_fast.step(0) + + assert r_fast > r_slow, f"Fast ({r_fast:.3f}) should earn more than slow ({r_slow:.3f})" + + +def test_theoretical_max_per_step(): + """Max reward/step = 1.0 × 1.0 × (1 + scale × max_speed) = 2.0 at scale=0.1, max=10.""" + env = MockEnv() + wrapped = SpeedRewardWrapper(env, speed_scale=0.1) + assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6) + + +def test_4tuple_step_compatibility(): + """Wrapper must handle 4-tuple step() return (old gym API).""" + env = MockEnv(speed=2.0, cte=0.5, use_5tuple=False) + env.set_pos([0., 0., 0.]) + wrapped = SpeedRewardWrapper(env, speed_scale=0.1) + wrapped.reset() + result = wrapped.step(0) + assert len(result) == 4, f"Expected 4-tuple, got {len(result)}" + _, reward, done, info = result + assert isinstance(reward, float) + assert reward != 999.0, "Should not use sim reward" + + +def test_reward_resets_on_episode_reset(): + """After reset, position history clears so efficiency recalculates cleanly.""" + env = MockEnv(speed=2.0, cte=0.5) wrapped = SpeedRewardWrapper(env, speed_scale=0.1, window_size=10) wrapped.reset() # Fill with circular data - radius = 0.5 for i in range(15): angle = 2 * math.pi * i / 12 - env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)]) + env.set_pos([0.5 * math.cos(angle), 0., 0.5 * math.sin(angle)]) wrapped.step(0) - eff_before_reset = wrapped._compute_efficiency() - - # Reset and drive straight for a few steps + # After reset, start fresh straight wrapped.reset() - for i in range(3): - env.set_pos([i * 0.3, 0.0, 0.0]) - wrapped.step(0) - - eff_after_reset = wrapped._compute_efficiency() - assert eff_after_reset > eff_before_reset, \ - f"After reset, efficiency should improve: before={eff_before_reset:.3f}, after={eff_after_reset:.3f}" - - -def test_speed_bonus_disappears_when_circling(): - """After circling for window_size steps, speed bonus should be nearly zero.""" - env = make_env_with_pos(speed=5.0, original_reward=1.0) - wrapped = SpeedRewardWrapper(env, speed_scale=0.5, window_size=20, min_efficiency=0.05) - wrapped.reset() - - # Warm up with circular motion - radius = 0.5 rewards = [] - for i in range(30): - angle = 2 * math.pi * (i % 20) / 20 # Full circle every 20 steps - env.set_pos([radius * math.cos(angle), 0.0, radius * math.sin(angle)]) + for i in range(5): + env.set_pos([i * 0.3, 0., 0.]) _, r, _, _, _ = wrapped.step(0) rewards.append(r) - # Later rewards (after window fills) should be close to original_reward - later_rewards = rewards[20:] - avg_later = sum(later_rewards) / len(later_rewards) - assert avg_later < 1.3, \ - f"Circular driving speed bonus should be suppressed, avg reward={avg_later:.3f} (original=1.0)" - - -# ---- Inherited guarantees ---- - -def test_crash_still_penalized(): - """Crash (original_reward=-1) should remain -1 regardless of speed or efficiency.""" - env = make_env_with_pos(speed=8.0, original_reward=-1.0, done=True) - wrapped = SpeedRewardWrapper(env, speed_scale=0.2) - wrapped.reset() - _, reward, _, _, _ = wrapped.step(0) - assert reward == -1.0, f"Crash reward should remain -1.0, got {reward}" - - -def test_theoretical_max_per_step(): - """Max reward/step bounded: original(1.0) × (1 + speed_scale × max_speed).""" - env = make_env_with_pos() - wrapped = SpeedRewardWrapper(env, speed_scale=0.1) - assert wrapped.theoretical_max_per_step(max_speed=10.0) == pytest.approx(2.0, abs=1e-6) + # Should get reasonable reward after fresh start + assert rewards[-1] > 0, "Should get positive reward after reset and straight driving"