68 lines
2.2 KiB
Python
68 lines
2.2 KiB
Python
import gymnasium as gym
|
|
import gym_donkeycar.envs
|
|
import gym_donkeycar.envs.donkey_env
|
|
import sys
|
|
import os
|
|
import time
|
|
|
|
host = os.environ.get('DONKEY_SIM_HOST', '10.0.0.55')
|
|
port = int(os.environ.get('DONKEY_SIM_PORT', '9091'))
|
|
print(f"Connecting to DonkeyCar sim at {host}:{port}")
|
|
|
|
from gym_donkeycar.envs.donkey_env import DonkeyEnv
|
|
|
|
# Query the Unity simulator for available tracks
|
|
class TrackCatcher(DonkeyEnv):
|
|
def __init__(self, conf=None, render_mode=None):
|
|
self.captured_tracks = None
|
|
super().__init__('generated_road', conf, render_mode)
|
|
def viewer_wait_until_loaded(self):
|
|
# patched wait to allow grabbing tracks
|
|
import time
|
|
t0 = time.time()
|
|
while not self.viewer.handler.loaded and (time.time() - t0 < 15):
|
|
# grab tracks if present
|
|
if hasattr(self.viewer.handler, 'scene_names') and self.viewer.handler.scene_names:
|
|
self.captured_tracks = self.viewer.handler.scene_names
|
|
break
|
|
time.sleep(1)
|
|
self.viewer.wait_until_loaded()
|
|
def reset(self, **kwargs):
|
|
self.viewer_wait_until_loaded()
|
|
return super().reset(**kwargs)
|
|
|
|
def get_tracks():
|
|
tc = TrackCatcher()
|
|
tc.reset()
|
|
tracks = tc.viewer.handler.scene_names if hasattr(tc.viewer.handler, 'scene_names') else None
|
|
tc.close()
|
|
return tracks
|
|
|
|
tracks = get_tracks()
|
|
if tracks:
|
|
print("Available tracks:")
|
|
for i, t in enumerate(tracks):
|
|
print(f"[{i}] {t}")
|
|
choice = input("Enter the number of the track to use: ")
|
|
try:
|
|
idx = int(choice.strip())
|
|
track = tracks[idx]
|
|
print(f"Loading track: {track}")
|
|
except Exception as e:
|
|
print(f"Invalid selection ({e}), using default track: {tracks[0]}")
|
|
track = tracks[0]
|
|
else:
|
|
print("Could not retrieve track list, defaulting to 'generated_road'")
|
|
track = 'generated_road'
|
|
|
|
# Now run a sim episode on the chosen track
|
|
env = DonkeyEnv(level=track)
|
|
obs, info = env.reset()
|
|
for t in range(10):
|
|
action = env.action_space.sample()
|
|
obs, reward, terminated, truncated, info = env.step(action)
|
|
print(f"Step {t}: reward {reward}, done {terminated or truncated}")
|
|
if terminated or truncated:
|
|
obs, info = env.reset()
|
|
env.close()
|