Ben Sprenger

Using Custom Gym Environments with Hugging Face’s LeRobot 🚀

LeRobot is a cool project for robotics reinforcement and imitation learning that I’ve been playing around with for a few months now. LeRobot comes with several built-in simulation environments and policies, but since its stated goal is to be used as a library, I wanted to try my hand at integrating my own environments and policies. So this blog post will be the first step: integrating a custom environment.

Why Integrate Custom Environments into LeRobot?

Instead of writing all your own training code to use a custom environment, with LeRobot you can take advantage of:

Overview of the Approach

The main steps to integrating a new environment are:

  1. Wrapping the original environment: LeRobot expects environments to have a specific interface. We create a thin wrapper around HoverAviary to match this interface.
  2. Registering the environment: We use Gymnasium’s registration API to make our custom environment discoverable by gymnasium.
  3. Providing a configuration via a dataclass: This ensures that the environment’s parameters (like FPS, task type, and action configuration) are easily adjustable through LeRobot’s command-line scripts.
  4. Making the config discoverable by LeRobot

First Environment: Pendulum-v1

First, let’s try to use LeRobot with the simple yet famous environment from gymnasium: Pendulum-v1, which is an RL staple (why?). LeRobot is built to use both state observations and image observations directly – for now, we will keep it simple and use only state observations from the environment.

Step 1: Wrapping the original environment

The built-in gym Pendulum-v1 differs from the format LeRobot expects in a few ways:

  1. LeRobot expects observations from the environment to be a dictionary that contains different types of observations, e.g. the state of the agent, the state of the environment, images, etc. In the specific case of the agent’s state, it appears to expect this observation to correspond to a key agent_pos in the dict. For Pendulum-v1, we are only working with the state of the agent (there is no distinction between the agent and the environment in this case). So, we need to do the following: a. We have to update the observation method to return a dict with the observations under the key agent_pos b. We have to update the observation_space attribute of the environment to match the return type of the observation (now a dict)
  2. The info dict returned by the step() method must contain the key is_success on each step, with a boolean value indicating whether the episode is a success or not. For the case of the pendulum, we won’t be using this parameter for training, so we can arbitrarily make it always False for now.

gymnasium already has some nice wrapper classes (gym.Wrapper, gym.ObservationWrapper, etc.) that we can subclass to wrap the observation and step methods to provide dict outputs:

create a new python file somewhere:

class LeRobotWrapper(gym.Wrapper):
    def __init__(self, env: gym.Env) -> None:
        super().__init__(env)
        self.observation_space = Dict({"agent_pos": self.env.observation_space})

    def observation(self, obs: np.ndarray) -> dict[str, np.ndarray]:
        return {"agent_pos": obs}

    def step(self, action: np.ndarray) -> tuple[dict, float, bool, bool, dict]:
        obs, reward, terminated, truncated, info = self.env.step(action)
        info["is_success"] = False  # Always False for now
        return self.observation(obs), float(reward), terminated, truncated, info

    def reset(
        self,
        *,
        seed: int | None = None,
        options: dict[str, Any] | None = None,
    ) -> tuple[dict, dict]:
        obs, info = self.env.reset(seed=seed, options=options)
        return self.observation(obs), info

Step 2: Registration with gym

Now that we have our new (modified) environment, we have to register it with gymnasium. This allows us to instantiate the environment by calling gym.make('name_of_the_environment'), which is what LeRobot calls under the hood to spawn an environment.

We can register by adding the following lines below our new environment:

register(
    id="gym_pendulum_v1/Pendulum-v1",
    entry_point=lambda **kwargs: LeRobotDictWrapper(gym.make("Pendulum-v1", **kwargs)),
)

Step 3: Config

LeRobot uses Python dataclasses alongside the draccus library to simplify configuration management. Using dataclasses allows for better typing and checking in Python, and draccus allows us to easily use the command line to pass arguments to these dataclasses.

Let’s look at the base configuration class for simulation environments in LeRobot:

@dataclass
class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
    task: str | None = None
    fps: int = 30
    features: dict[str, PolicyFeature] = field(default_factory=dict)
    features_map: dict[str, str] = field(default_factory=dict)

    @property
    def type(self) -> str:
        return self.get_choice_name(self.__class__)

    @abc.abstractproperty
    def gym_kwargs(self) -> dict:
        raise NotImplementedError()

This class outlines several key parameters that all simulation environments must have:

Therefore, the first step to porting a new environment is to subclass EnvConfig and define these parameters. For our case, we end up with:

@EnvConfig.register_subclass("pendulum_v1")
@dataclass
class PendulumV1Env(EnvConfig):
    task: str = "Pendulum-v1"
    fps: int = 30
    episode_length: int = 200
    obs_type: str = "state"
    render_mode: str = "rgb_array"
    features: dict[str, PolicyFeature] = field(
        default_factory=lambda: {
            "action": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
            "state": PolicyFeature(type=FeatureType.STATE, shape=(3,)),
        }
    )
    features_map: dict[str, str] = field(
        default_factory=lambda: {
            "action": ACTION,
            "state": OBS_ROBOT,
        }
    )

    @property
    def gym_kwargs(self) -> dict:
        return {
            "render_mode": self.render_mode,
            "max_episode_steps": self.episode_length,
        }

Step 4: Using the environment with LeRobot

Now we have all our ingredients. right now LeRobot doesn’t support online learning, it only supports imitation learning which means we need a dataset to train a policy to test our environment we will create a dataset with random actions then we will test training on it even though the policy will be bad this will tell us if our environment works

# Do dataset loop here

Second Environment: gym-pybullet-drones

Next, we’ll integrate a sim I knew well from my MSc thesis: HoverAviary from the gym_pybullet_drones package (the lab where I did my MSc thesis wrote the env).

Code Walkthrough

0. setup env

describe UV setup here and installing lerobot

1. Creating the LeRobot Configuration Dataclass

LeRobot leverages Python’s dataclasses alongside the draccus library to simplify configuration management, more on draccus in a second

@dataclass
class EnvConfig(draccus.ChoiceRegistry, abc.ABC):
    task: str | None = None
    fps: int = 30
    features: dict[str, PolicyFeature] = field(default_factory=dict)
    features_map: dict[str, str] = field(default_factory=dict)

    @property
    def type(self) -> str:
        return self.get_choice_name(self.__class__)

    @abc.abstractproperty
    def gym_kwargs(self) -> dict:
        raise NotImplementedError()

This class outlines several key parameters:

Therefore, the first step to porting a new environment is to subclass EnvConfig and define these parameters. For our case, we end up with:

@EnvConfig.register_subclass("pybullet_drones")
@dataclass
class PybulletDronesEnv(EnvConfig):
    task: str = "hover"
    fps: int = 30
    episode_length: int = 200
    render_mode: str = "rgb_array"
    act_type: ActionType = ActionType.ONE_D_RPM
    features: dict[str, PolicyFeature] = field(
        default_factory=lambda: {
            "action": PolicyFeature(type=FeatureType.ACTION, shape=()),
            "state": PolicyFeature(type=FeatureType.STATE, shape=()),
        }
    )
    features_map: dict[str, str] = field(
        default_factory=lambda: {
            "action": ACTION,
            "state": OBS_ROBOT,
        }
    )

    @property
    def gym_kwargs(self) -> dict:
        return {
            "ctrl_freq": self.fps,
            "max_episode_steps": self.episode_length,
            "render_mode": self.render_mode,
            "act": self.act_type,
        }

    def __post_init__(self) -> None:
        # Adjust action shape based on act_type
        if self.act_type == ActionType.ONE_D_RPM:
            action_shape = (1,)
        elif self.act_type == ActionType.RPM:
            action_shape = (4,)
        else:
            raise ValueError(f"Unsupported act_type: {self.act_type}")  # noqa: TRY003
        self.features["action"].shape = action_shape
        # there is a 0.5s action buffer in the observation and the state is 12-dimensional
        self.features["state"].shape = (self.fps // 2 * action_shape[0] + 12,)

The first key thing to note is the line:

@EnvConfig.register_subclass("pybullet_drones")

Here we see why we use draccus in the first place. This allows draccus to find the right EnvConfig subclass based on cmd-line args:

@parser.wraps()
def create_cfg(cfg: EnvConfig):
    print(type(cfg))
    print(cfg.gym_kwargs)

if __name__ == "__main__":
    create_cfg()

# outputs:
# ::: something

The rest of the Pybullet Drones config subclass adapts the base configuration for a PyBullet drones environment. Notice the use of the post_init method, which adjusts dependent parameters (like the action shape) after the initial instantiation.

2. Environment API

We start by subclassing HoverAviary to build our custom environment:

if your env does not follow this which it almost certainly doesn’t you can make a wrapper or subclass the env

3. Register the env