donkeycar-rl-autoresearch/agent/choose_and_run_track.py

68 lines
2.2 KiB
Python

import gymnasium as gym
import gym_donkeycar.envs
import gym_donkeycar.envs.donkey_env
import sys
import os
import time
host = os.environ.get('DONKEY_SIM_HOST', '10.0.0.55')
port = int(os.environ.get('DONKEY_SIM_PORT', '9091'))
print(f"Connecting to DonkeyCar sim at {host}:{port}")
from gym_donkeycar.envs.donkey_env import DonkeyEnv
# Query the Unity simulator for available tracks
class TrackCatcher(DonkeyEnv):
def __init__(self, conf=None, render_mode=None):
self.captured_tracks = None
super().__init__('generated_road', conf, render_mode)
def viewer_wait_until_loaded(self):
# patched wait to allow grabbing tracks
import time
t0 = time.time()
while not self.viewer.handler.loaded and (time.time() - t0 < 15):
# grab tracks if present
if hasattr(self.viewer.handler, 'scene_names') and self.viewer.handler.scene_names:
self.captured_tracks = self.viewer.handler.scene_names
break
time.sleep(1)
self.viewer.wait_until_loaded()
def reset(self, **kwargs):
self.viewer_wait_until_loaded()
return super().reset(**kwargs)
def get_tracks():
tc = TrackCatcher()
tc.reset()
tracks = tc.viewer.handler.scene_names if hasattr(tc.viewer.handler, 'scene_names') else None
tc.close()
return tracks
tracks = get_tracks()
if tracks:
print("Available tracks:")
for i, t in enumerate(tracks):
print(f"[{i}] {t}")
choice = input("Enter the number of the track to use: ")
try:
idx = int(choice.strip())
track = tracks[idx]
print(f"Loading track: {track}")
except Exception as e:
print(f"Invalid selection ({e}), using default track: {tracks[0]}")
track = tracks[0]
else:
print("Could not retrieve track list, defaulting to 'generated_road'")
track = 'generated_road'
# Now run a sim episode on the chosen track
env = DonkeyEnv(level=track)
obs, info = env.reset()
for t in range(10):
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
print(f"Step {t}: reward {reward}, done {terminated or truncated}")
if terminated or truncated:
obs, info = env.reset()
env.close()