""" Track Switcher — sends exit_scene to DonkeyCar sim to return to main menu, then reconnects with the desired environment/scene. The gym wrapper only loads a scene when the sim is at the main menu (scene_selection_ready state). If a scene is already running, it ignores the env ID and stays on the current scene. This utility fixes that. Usage: from track_switcher import switch_track env = switch_track('donkey-generated-track-v0') # Sim is now running the generated track, ready for evaluation """ import time import json import socket import gymnasium as gym import gym_donkeycar SIM_HOST = 'localhost' SIM_PORT = 9091 EXIT_SCENE_WAIT = 4.0 # seconds to wait after exit_scene for sim to reach menu CONNECT_WAIT = 1.0 # seconds to wait for initial connection def send_exit_scene_raw(): """ Send the exit_scene message directly via raw TCP socket, without going through the full gym env setup. This avoids the wait_until_loaded() timeout. """ msg = json.dumps({'msg_type': 'exit_scene'}) + '\n' try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(5.0) s.connect((SIM_HOST, SIM_PORT)) time.sleep(CONNECT_WAIT) s.sendall(msg.encode('utf-8')) time.sleep(0.5) s.close() print(f'[TrackSwitch] Sent exit_scene to sim.', flush=True) return True except Exception as e: print(f'[TrackSwitch] Could not send exit_scene: {e}', flush=True) return False def switch_track(target_env_id: str, current_env_id: str = 'donkey-generated-roads-v0', verbose: bool = True): """ Switch the DonkeyCar simulator to a different scene/track. 1. Connect to the current scene 2. Send exit_scene via the proper websocket → sim returns to main menu 3. Wait for sim to show scene selection screen 4. Connect with target_env_id → sim loads the correct scene Args: target_env_id: Gymnasium env ID to switch TO, e.g. 'donkey-generated-track-v0' current_env_id: Gymnasium env ID currently running (used to connect and send exit) verbose: print status messages Returns: gymnasium.Env: the new environment connected to the target scene """ if verbose: print(f'\n[TrackSwitch] Switching from {current_env_id} → {target_env_id}', flush=True) print(f'[TrackSwitch] Step 1: Connecting to current scene...', flush=True) try: temp_env = gym.make(current_env_id) base_env = temp_env.unwrapped # bypass OrderEnforcing wrapper time.sleep(1.0) if verbose: print(f'[TrackSwitch] Step 2: Sending exit_scene via websocket...', flush=True) base_env.viewer.exit_scene() time.sleep(0.5) base_env.viewer.quit() if verbose: print(f'[TrackSwitch] exit_scene sent. Disconnected.', flush=True) except Exception as e: if verbose: print(f'[TrackSwitch] Warning during exit: {e} — sim may already be at menu.', flush=True) if verbose: print(f'[TrackSwitch] Step 3: Waiting {EXIT_SCENE_WAIT}s for sim to reach main menu...', flush=True) time.sleep(EXIT_SCENE_WAIT) if verbose: print(f'[TrackSwitch] Step 4: Connecting to {target_env_id}...', flush=True) try: env = gym.make(target_env_id) if verbose: print(f'[TrackSwitch] ✅ Connected to {target_env_id}!', flush=True) return env except Exception as e: print(f'[TrackSwitch] ERROR connecting to {target_env_id}: {e}', flush=True) raise def current_scene_exit_and_reconnect(current_env_id: str, target_env_id: str, verbose: bool = True): """ Alternative approach: connect to current scene, call exit_scene(), then reconnect. Use this if the raw socket approach doesn't work. """ if verbose: print(f'[TrackSwitch] Connecting to current scene ({current_env_id}) to send exit...', flush=True) try: temp_env = gym.make(current_env_id) time.sleep(1.0) temp_env.viewer.exit_scene() if verbose: print(f'[TrackSwitch] exit_scene sent via gym env.', flush=True) time.sleep(1.0) temp_env.close() except Exception as e: print(f'[TrackSwitch] Warning during temp connect: {e}', flush=True) if verbose: print(f'[TrackSwitch] Waiting {EXIT_SCENE_WAIT}s for sim to reach main menu...', flush=True) time.sleep(EXIT_SCENE_WAIT) if verbose: print(f'[TrackSwitch] Connecting to target: {target_env_id}', flush=True) return gym.make(target_env_id) # Available tracks with their env IDs and scene names AVAILABLE_TRACKS = { 'generated_road': 'donkey-generated-roads-v0', 'generated_track': 'donkey-generated-track-v0', 'mountain': 'donkey-mountain-track-v0', 'warehouse': 'donkey-warehouse-v0', 'waveshare': 'donkey-waveshare-v0', 'mini_monaco': 'donkey-minimonaco-track-v0', }