LeRobot is a cool project for robotics reinforcement and imitation learning that I’ve been playing around with for a few months now. LeRobot comes with several built-in simulation environments and policies, but since its stated goal is to be used as a library, I wanted to try my hand at integrating my own environments and policies. So this blog post will be the first step: integrating a custom environment.
Instead of writing all your own training code to use a custom environment, with LeRobot you can take advantage of:
The main steps to integrating a new environment are:
HoverAviary
to match this interface.gymnasium
.Pendulum-v1
First, let’s try to use LeRobot with the simple yet famous environment from gymnasium: Pendulum-v1
, which is an RL staple (why?).
LeRobot is built to use both state observations and image observations directly – for now, we will
keep it simple and use only state observations from the environment.
The built-in gym Pendulum-v1
differs from the format LeRobot expects in a few ways:
agent_pos
in the dict.
For Pendulum-v1
, we are only working with the state of the agent (there is no distinction between the agent
and the environment in this case). So, we need to do the following:
a. We have to update the observation
method to return a dict with the observations under the key agent_pos
b. We have to update the observation_space
attribute of the environment to match the return type of the observation (now a dict)info
dict returned by the step()
method must contain the key is_success
on each step, with a boolean value indicating
whether the episode is a success or not. For the case of the pendulum, we won’t be using this parameter for training, so we can
arbitrarily make it always False for now.gymnasium
already has some nice wrapper classes (gym.Wrapper
, gym.ObservationWrapper
, etc.) that we
can subclass to wrap the observation
and step
methods to provide dict outputs:
create a new python file somewhere:
class LeRobotWrapper(gym.Wrapper):
def __init__(self, env: gym.Env) -> None:
super().__init__(env)
self.observation_space = Dict({"agent_pos": self.env.observation_space})
def observation(self, obs: np.ndarray) -> dict[str, np.ndarray]:
return {"agent_pos": obs}
def step(self, action: np.ndarray) -> tuple[dict, float, bool, bool, dict]:
obs, reward, terminated, truncated, info = self.env.step(action)
info["is_success"] = False # Always False for now
return self.observation(obs), float(reward), terminated, truncated, info
def reset(
self,
*,
seed: int | None = None,
options: dict[str, Any] | None = None,
) -> tuple[dict, dict]:
obs, info = self.env.reset(seed=seed, options=options)
return self.observation(obs), info
gym
Now that we have our new (modified) environment, we have to register it with gymnasium
. This
allows us to instantiate the environment by calling gym.make('name_of_the_environment')
, which
is what LeRobot calls under the hood to spawn an environment.
We can register by adding the following lines below our new environment:
register(
id="gym_pendulum_v1/Pendulum-v1",
entry_point=lambda **kwargs: LeRobotDictWrapper(gym.make("Pendulum-v1", **kwargs)),
)
LeRobot uses Python dataclasses alongside the draccus library to simplify configuration management. Using dataclasses allows for better typing and checking in Python, and draccus allows us to easily use the command line to pass arguments to these dataclasses.
Let’s look at the base configuration class for simulation environments in LeRobot:
@dataclass
class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
task: str | None = None
fps: int = 30
features: dict[str, PolicyFeature] = field(default_factory=dict)
features_map: dict[str, str] = field(default_factory=dict)
@property
def type(self) -> str:
return self.get_choice_name(self.__class__)
@abc.abstractproperty
def gym_kwargs(self) -> dict:
raise NotImplementedError()
This class outlines several key parameters that all simulation environments must have:
task
: The name of the taskfps
: The frame rate controlling the simulation and rendering.features
features_map
: Dictionaries that define policy features and their mapping to
environment observations/actions.gym_kwargs
: A property that aggregates configuration parameters into a
dictionary, which is then passed directly to your Gym environment.
these go directly to gym.make therefore should have max_episode_steps and then
kwargs for the env constructortype
property is special and we will see how it is used in a secondTherefore, the first step to porting a new environment is to subclass
EnvConfig
and define these parameters. For our case, we end up with:
@EnvConfig.register_subclass("pendulum_v1")
@dataclass
class PendulumV1Env(EnvConfig):
task: str = "Pendulum-v1"
fps: int = 30
episode_length: int = 200
obs_type: str = "state"
render_mode: str = "rgb_array"
features: dict[str, PolicyFeature] = field(
default_factory=lambda: {
"action": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
"state": PolicyFeature(type=FeatureType.STATE, shape=(3,)),
}
)
features_map: dict[str, str] = field(
default_factory=lambda: {
"action": ACTION,
"state": OBS_ROBOT,
}
)
@property
def gym_kwargs(self) -> dict:
return {
"render_mode": self.render_mode,
"max_episode_steps": self.episode_length,
}
Now we have all our ingredients. right now LeRobot doesn’t support online learning, it only supports imitation learning which means we need a dataset to train a policy to test our environment we will create a dataset with random actions then we will test training on it even though the policy will be bad this will tell us if our environment works
# Do dataset loop here
gym-pybullet-drones
Next, we’ll integrate a sim I knew well from my MSc thesis: HoverAviary
from the gym_pybullet_drones
package (the lab where I did my MSc thesis wrote the env).
describe UV setup here and installing lerobot
LeRobot leverages Python’s dataclasses alongside the draccus library to simplify configuration management, more on draccus in a second
@dataclass
class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
task: str | None = None
fps: int = 30
features: dict[str, PolicyFeature] = field(default_factory=dict)
features_map: dict[str, str] = field(default_factory=dict)
@property
def type(self) -> str:
return self.get_choice_name(self.__class__)
@abc.abstractproperty
def gym_kwargs(self) -> dict:
raise NotImplementedError()
This class outlines several key parameters:
task
: The name of the task (e.g., “hover” for drone simulation).fps
: The frame rate controlling the simulation and rendering.features
features_map
: Dictionaries that define policy features and their mapping to
environment observations/actions.gym_kwargs
: A property that aggregates configuration parameters into a
dictionary, which is then passed directly to your Gym environment.
these go directly to gym.make therefore should have max_episode_steps and then
kwargs for the env constructortype
property is special and we will see how it is used in a secondTherefore, the first step to porting a new environment is to subclass
EnvConfig
and define these parameters. For our case, we end up with:
@EnvConfig.register_subclass("pybullet_drones")
@dataclass
class PybulletDronesEnv(EnvConfig):
task: str = "hover"
fps: int = 30
episode_length: int = 200
render_mode: str = "rgb_array"
act_type: ActionType = ActionType.ONE_D_RPM
features: dict[str, PolicyFeature] = field(
default_factory=lambda: {
"action": PolicyFeature(type=FeatureType.ACTION, shape=()),
"state": PolicyFeature(type=FeatureType.STATE, shape=()),
}
)
features_map: dict[str, str] = field(
default_factory=lambda: {
"action": ACTION,
"state": OBS_ROBOT,
}
)
@property
def gym_kwargs(self) -> dict:
return {
"ctrl_freq": self.fps,
"max_episode_steps": self.episode_length,
"render_mode": self.render_mode,
"act": self.act_type,
}
def __post_init__(self) -> None:
# Adjust action shape based on act_type
if self.act_type == ActionType.ONE_D_RPM:
action_shape = (1,)
elif self.act_type == ActionType.RPM:
action_shape = (4,)
else:
raise ValueError(f"Unsupported act_type: {self.act_type}") # noqa: TRY003
self.features["action"].shape = action_shape
# there is a 0.5s action buffer in the observation and the state is 12-dimensional
self.features["state"].shape = (self.fps // 2 * action_shape[0] + 12,)
The first key thing to note is the line:
@EnvConfig.register_subclass("pybullet_drones")
Here we see why we use draccus in the first place. This allows draccus to find
the right EnvConfig
subclass based on cmd-line args:
@parser.wraps()
def create_cfg(cfg: EnvConfig):
print(type(cfg))
print(cfg.gym_kwargs)
if __name__ == "__main__":
create_cfg()
# outputs:
# ::: something
The rest of the Pybullet Drones config subclass adapts the base configuration for a PyBullet drones environment. Notice the use of the post_init method, which adjusts dependent parameters (like the action shape) after the initial instantiation.
We start by subclassing HoverAviary
to build our custom environment:
{"agent_pos": obs}
dict obsif your env does not follow this which it almost certainly doesn’t you can make a wrapper or subclass the env