Train Behavior Cloning in a Custom Environment#
You can use imitation
to train a policy (and, for many imitation learning algorithm, learn rewards) in a custom environment.
Step 1: Define the environment#
We will use a simple ObservationMatching environment as an example. The premise is simple – the agent receives a vector of observations, and must output a vector of actions that matches the observations as closely as possible.
If you have your own environment that you’d like to use, you can replace the code below with your own environment. Make sure it complies with the standard Gym API, and that the observation and action spaces are specified correctly.
from typing import Dict, Optional
from typing import Any
import numpy as np
import gymnasium as gym
from gymnasium.spaces import Box
class ObservationMatchingEnv(gym.Env):
def __init__(self, num_options: int = 2):
self.state = None
self.num_options = num_options
self.observation_space = Box(0, 1, shape=(num_options,))
self.action_space = Box(0, 1, shape=(num_options,))
def reset(self, seed: int = None, options: Optional[Dict[str, Any]] = None):
super().reset(seed=seed, options=options)
self.state = self.observation_space.sample()
return self.state, {}
def step(self, action):
reward = -np.abs(self.state - action).mean()
self.state = self.observation_space.sample()
return self.state, reward, False, False, {}
Step 2: create the environment#
From here, we have two options:
Add the environment to the gym registry, and use it with existing utilities (e.g.
make
)Use the environment directly
You only need to execute the cells in step 2a, or step 2b to proceed.
At the end of these steps, we want to have:
env
: a single environment that we can use for training an expert with SB3venv
: a vectorized environment where each individual environment is wrapped inRolloutInfoWrapper
, that we can use for collecting rollouts withimitation
Step 2a (recommended): add the environment to the gym registry#
The standard approach is adding the environment to the gym registry.
gym.register(
id="custom/ObservationMatching-v0",
entry_point=ObservationMatchingEnv, # This can also be the path to the class, e.g. `observation_matching:ObservationMatchingEnv`
max_episode_steps=500,
)
After registering, you can create an environment is gym.make(env_id)
which automatically handles the TimeLimit
wrapper.
To create a vectorized env, you can use the make_vec_env
helper function (Option A), or create it directly (Options B1 and B2)
from gymnasium.wrappers import TimeLimit
from imitation.data import rollout
from imitation.data.wrappers import RolloutInfoWrapper
from imitation.util.util import make_vec_env
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
# Create a single environment for training an expert with SB3
env = gym.make("custom/ObservationMatching-v0")
# Create a vectorized environment for training with `imitation`
# Option A: use the `make_vec_env` helper function - make sure to pass `post_wrappers=[lambda env, _: RolloutInfoWrapper(env)]`
venv = make_vec_env(
"custom/ObservationMatching-v0",
rng=np.random.default_rng(),
n_envs=4,
post_wrappers=[lambda env, _: RolloutInfoWrapper(env)],
)
# Option B1: use a custom env creator, and create VecEnv directly
# def _make_env():
# """Helper function to create a single environment. Put any logic here, but make sure to return a RolloutInfoWrapper."""
# _env = gym.make("custom/ObservationMatching-v0")
# _env = RolloutInfoWrapper(_env)
# return _env
#
# venv = DummyVecEnv([_make_env for _ in range(4)])
#
# # Option B2: we can also use a parallel VecEnv implementation
# venv = SubprocVecEnv([_make_env for _ in range(4)])
Step 2b: directly use the environment#
Alternatively, we can directly initialize the environment by instantiating the class we created earlier, and handle all the additional logic ourselves.
from gymnasium.wrappers import TimeLimit
from imitation.data import rollout
from imitation.data.wrappers import RolloutInfoWrapper
from stable_baselines3.common.vec_env import DummyVecEnv
import numpy as np
# Create a single environment for training with SB3
env = ObservationMatchingEnv()
env = TimeLimit(env, max_episode_steps=500)
# Create a vectorized environment for training with `imitation`
# Option A: use a helper function to create multiple environments
def _make_env():
"""Helper function to create a single environment. Put any logic here, but make sure to return a RolloutInfoWrapper."""
_env = ObservationMatchingEnv()
_env = TimeLimit(_env, max_episode_steps=500)
_env = RolloutInfoWrapper(_env)
return _env
venv = DummyVecEnv([_make_env for _ in range(4)])
# Option B: use a single environment
# env = FixedHorizonCartPoleEnv()
# venv = DummyVecEnv([lambda: RolloutInfoWrapper(env)]) # Wrap a single environment -- only useful for simple testing like this
# Option C: use multiple environments
# venv = DummyVecEnv([lambda: RolloutInfoWrapper(ObservationMatchingEnv()) for _ in range(4)]) # Wrap multiple environments
Step 3: Training#
And now we’re just about done! Whether you used step 2a or 2b, your environment should now be ready to use with SB3 and imitation
.
For the sake of completeness, we’ll train a BC model, the same way as in the first tutorial, but with our custom environment.
Keep in mind that while we’re using BC in this tutorial, you can just as easily use any of the other algorithms with the environment prepared in this way.
from stable_baselines3 import PPO
from stable_baselines3.ppo import MlpPolicy
from stable_baselines3.common.evaluation import evaluate_policy
from gymnasium.wrappers import TimeLimit
expert = PPO(
policy=MlpPolicy,
env=env,
seed=0,
batch_size=64,
ent_coef=0.0,
learning_rate=0.0003,
n_epochs=10,
n_steps=64,
)
reward, _ = evaluate_policy(expert, env, 10)
print(f"Reward before training: {reward}")
# Note: if you followed step 2a, i.e. registered the environment, you can use the environment name directly
# expert = PPO(
# policy=MlpPolicy,
# env="custom/ObservationMatching-v0",
# seed=0,
# batch_size=64,
# ent_coef=0.0,
# learning_rate=0.0003,
# n_epochs=10,
# n_steps=64,
# )
expert.learn(10_000) # Note: set to 100000 to train a proficient expert
reward, _ = evaluate_policy(expert, expert.get_env(), 10)
print(f"Expert reward: {reward}")
Reward before training: -247.31714964704588
Expert reward: -100.7207043
rng = np.random.default_rng()
rollouts = rollout.rollout(
expert,
venv,
rollout.make_sample_until(min_timesteps=None, min_episodes=50),
rng=rng,
)
transitions = rollout.flatten_trajectories(rollouts)
from imitation.algorithms import bc
bc_trainer = bc.BC(
observation_space=env.observation_space,
action_space=env.action_space,
demonstrations=transitions,
rng=rng,
)
As before, the untrained policy only gets poor rewards:
reward_before_training, _ = evaluate_policy(bc_trainer.policy, env, 10)
print(f"Reward before training: {reward_before_training}")
Reward before training: -250.60812856666743
After training, we can get much closer to the expert’s performance:
bc_trainer.train(n_epochs=1)
reward_after_training, _ = evaluate_policy(bc_trainer.policy, env, 10)
print(f"Reward after training: {reward_after_training}")
--------------------------------
| batch_size | 32 |
| bc/ | |
| batch | 0 |
| ent_loss | -0.00284 |
| entropy | 2.84 |
| epoch | 0 |
| l2_loss | 0 |
| l2_norm | 68.5 |
| loss | 2.34 |
| neglogp | 2.34 |
| prob_true_act | 0.101 |
| samples_so_far | 32 |
--------------------------------
--------------------------------
| batch_size | 32 |
| bc/ | |
| batch | 500 |
| ent_loss | -0.00181 |
| entropy | 1.81 |
| epoch | 0 |
| l2_loss | 0 |
| l2_norm | 75.9 |
| loss | 1.06 |
| neglogp | 1.06 |
| prob_true_act | 0.357 |
| samples_so_far | 16032 |
--------------------------------
Reward after training: -41.17174798576161