DAgger#

DAgger (Dataset Aggregation) iteratively trains a policy using supervised learning on a dataset of observation-action pairs from expert demonstrations (like behavioral cloning), runs the policy to gather observations, queries the expert for good actions on those observations, and adds the newly labeled observations to the dataset. DAgger improves on behavioral cloning by training on a dataset that better resembles the observations the trained policy is likely to encounter, but it requires querying the expert online.

Example#

Detailed example notebook: Train an Agent using the DAgger Algorithm

import tempfile

import numpy as np
import gymnasium as gym
from stable_baselines3.common.evaluation import evaluate_policy

from imitation.algorithms import bc
from imitation.algorithms.dagger import SimpleDAggerTrainer
from imitation.policies.serialize import load_policy
from imitation.util.util import make_vec_env

rng = np.random.default_rng(0)
env = make_vec_env(
    "seals:seals/CartPole-v0",
    rng=rng,
)
expert = load_policy(
    "ppo-huggingface",
    organization="HumanCompatibleAI",
    env_name="seals-CartPole-v0",
    venv=env,
)

bc_trainer = bc.BC(
    observation_space=env.observation_space,
    action_space=env.action_space,
    rng=rng,
)
with tempfile.TemporaryDirectory(prefix="dagger_example_") as tmpdir:
    print(tmpdir)
    dagger_trainer = SimpleDAggerTrainer(
        venv=env,
        scratch_dir=tmpdir,
        expert_policy=expert,
        bc_trainer=bc_trainer,
        rng=rng,
    )
    dagger_trainer.train(8_000)

reward, _ = evaluate_policy(dagger_trainer.policy, env, 10)
print("Reward:", reward)

API#

class imitation.algorithms.dagger.InteractiveTrajectoryCollector(venv, get_robot_acts, beta, save_dir, rng)[source]

Bases: VecEnvWrapper

DAgger VecEnvWrapper for querying and saving expert actions.

Every call to .step(actions) accepts and saves expert actions to self.save_dir, but only forwards expert actions to the wrapped VecEnv with probability self.beta. With probability 1 - self.beta, a “robot” action (i.e an action from the imitation policy) is forwarded instead.

Demonstrations are saved as TrajectoryWithRew to self.save_dir at the end of every episode.

__init__(venv, get_robot_acts, beta, save_dir, rng)[source]

Builds InteractiveTrajectoryCollector.

Parameters
  • venv (VecEnv) – vectorized environment to sample trajectories from.

  • get_robot_acts (Callable[[ndarray], ndarray]) – get robot actions that can be substituted for human actions. Takes a vector of observations as input & returns a vector of actions.

  • beta (float) – fraction of the time to use action given to .step() instead of robot action. The choice of robot or human action is independently randomized for each individual Env at every timestep.

  • save_dir (Union[str, bytes, PathLike]) – directory to save collected trajectories in.

  • rng (Generator) – random state for random number generation.

close()

Clean up the environment’s resources.

Return type

None

env_is_wrapped(wrapper_class, indices=None)

Check if environments are wrapped with a given wrapper.

Parameters
  • method_name – The name of the environment method to invoke.

  • indices (Union[None, int, Iterable[int]]) – Indices of envs whose method to call

  • method_args – Any positional arguments to provide in the call

  • method_kwargs – Any keyword arguments to provide in the call

Return type

List[bool]

Returns

True if the env is wrapped, False otherwise, for each env queried.

env_method(method_name, *method_args, indices=None, **method_kwargs)

Call instance methods of vectorized environments.

Parameters
  • method_name (str) – The name of the environment method to invoke.

  • indices (Union[None, int, Iterable[int]]) – Indices of envs whose method to call

  • method_args – Any positional arguments to provide in the call

  • method_kwargs – Any keyword arguments to provide in the call

Return type

List[Any]

Returns

List of items returned by the environment’s method call

get_attr(attr_name, indices=None)

Return attribute from vectorized environment.

Parameters
  • attr_name (str) – The name of the attribute whose value to return

  • indices (Union[None, int, Iterable[int]]) – Indices of envs to get attribute from

Return type

List[Any]

Returns

List of values of ‘attr_name’ in all environments

get_images()

Return RGB images from each environment when available

Return type

Sequence[Optional[ndarray]]

getattr_depth_check(name, already_found)

See base class.

Return type

Optional[str]

Returns

name of module whose attribute is being shadowed, if any.

getattr_recursive(name)

Recursively check wrappers to find attribute.

Parameters

name (str) – name of attribute to look for

Return type

Any

Returns

attribute

render(mode=None)

Gym environment rendering

Parameters

mode (Optional[str]) – the rendering type

Return type

Optional[ndarray]

reset()[source]

Resets the environment.

Returns

first observation of a new trajectory.

Return type

obs

reset_infos: List[Dict[str, Any]]
seed(seed=None)[source]

Set the seed for the DAgger random number generator and wrapped VecEnv.

The DAgger RNG is used along with self.beta to determine whether the expert or robot action is forwarded to the wrapped VecEnv.

Parameters

seed (Optional[int]) – The random seed. May be None for completely random seeding.

Return type

List[Optional[int]]

Returns

A list containing the seeds for each individual env. Note that all list elements may be None, if the env does not return anything when seeded.

set_attr(attr_name, value, indices=None)

Set attribute inside vectorized environments.

Parameters
  • attr_name (str) – The name of attribute to assign new value

  • value (Any) – Value to assign to attr_name

  • indices (Union[None, int, Iterable[int]]) – Indices of envs to assign value

Return type

None

Returns

set_options(options=None)

Set environment options for all environments. If a dict is passed instead of a list, the same options will be used for all environments. WARNING: Those options will only be passed to the environment at the next reset.

Parameters

options (Union[List[Dict], Dict, None]) – A dictionary of environment options to pass to each environment at the next reset.

Return type

None

step(actions)

Step the environments with the given action

Parameters

actions (ndarray) – the action

Return type

Tuple[Union[ndarray, Dict[str, ndarray], Tuple[ndarray, ...]], ndarray, ndarray, List[Dict]]

Returns

observation, reward, done, information

step_async(actions)[source]

Steps with a 1 - beta chance of using self.get_robot_acts instead.

DAgger needs to be able to inject imitation policy actions randomly at some subset of time steps. This method has a self.beta chance of keeping the actions passed in as an argument, and a 1 - self.beta chance of forwarding actions generated by self.get_robot_acts instead. “robot” (i.e. imitation policy) action if necessary.

At the end of every episode, a TrajectoryWithRew is saved to self.save_dir, where every saved action is the expert action, regardless of whether the robot action was used during that timestep.

Parameters

actions (ndarray) – the _intended_ demonstrator/expert actions for the current state. This will be executed with probability self.beta. Otherwise, a “robot” (typically a BC policy) action will be sampled and executed instead via self.get_robot_act.

Return type

None

step_wait()[source]

Returns observation, reward, etc after previous step_async() call.

Stores the transition, and saves trajectory as demo once complete.

Return type

Tuple[Union[ndarray, Dict[str, ndarray], Tuple[ndarray, ...]], ndarray, ndarray, List[Dict]]

Returns

Observation, reward, dones (is terminal?) and info dict.

traj_accum: Optional[TrajectoryAccumulator]
property unwrapped: VecEnv
Return type

VecEnv

class imitation.algorithms.dagger.DAggerTrainer(*, venv, scratch_dir, rng, beta_schedule=None, bc_trainer, custom_logger=None)[source]

Bases: BaseImitationAlgorithm

DAgger training class with low-level API suitable for interactive human feedback.

In essence, this is just BC with some helpers for incrementally resuming training and interpolating between demonstrator/learnt policies. Interaction proceeds in “rounds” in which the demonstrator first provides a fresh set of demonstrations, and then an underlying BC is invoked to fine-tune the policy on the entire set of demonstrations collected in all rounds so far. Demonstrations and policy/trainer checkpoints are stored in a directory with the following structure:

scratch-dir-name/
    checkpoint-001.pt
    checkpoint-002.pt
    …
    checkpoint-XYZ.pt
    checkpoint-latest.pt
    demos/
        round-000/
            demos_round_000_000.npz
            demos_round_000_001.npz
            …
        round-001/
            demos_round_001_000.npz
            …
        …
        round-XYZ/
            …
DEFAULT_N_EPOCHS: int = 4

The default number of BC training epochs in extend_and_update.

__init__(*, venv, scratch_dir, rng, beta_schedule=None, bc_trainer, custom_logger=None)[source]

Builds DAggerTrainer.

Parameters
  • venv (VecEnv) – Vectorized training environment.

  • scratch_dir (Union[str, bytes, PathLike]) – Directory to use to store intermediate training information (e.g. for resuming training).

  • rng (Generator) – random state for random number generation.

  • beta_schedule (Optional[Callable[[int], float]]) – Provides a value of beta (the probability of taking expert action in any given state) at each round of training. If None, then linear_beta_schedule will be used instead.

  • bc_trainer (BC) – A BC instance used to train the underlying policy.

  • custom_logger (Optional[HierarchicalLogger]) – Where to log to; if None (default), creates a new logger.

allow_variable_horizon: bool

If True, allow variable horizon trajectories; otherwise error if detected.

property batch_size: int
Return type

int

create_trajectory_collector()[source]

Create trajectory collector to extend current round’s demonstration set.

Return type

InteractiveTrajectoryCollector

Returns

A collector configured with the appropriate beta, imitator policy, etc. for the current round. Refer to the documentation for InteractiveTrajectoryCollector to see how to use this.

extend_and_update(bc_train_kwargs=None)[source]

Extend internal batch of data and train BC.

Specifically, this method will load new transitions (if necessary), train the model for a while, and advance the round counter. If there are no fresh demonstrations in the demonstration directory for the current round, then this will raise a NeedsDemosException instead of training or advancing the round counter. In that case, the user should call .create_trajectory_collector() and use the returned InteractiveTrajectoryCollector to produce a new set of demonstrations for the current interaction round.

Parameters

bc_train_kwargs (Optional[Mapping[str, Any]]) – Keyword arguments for calling BC.train(). If the log_rollouts_venv key is not provided, then it is set to self.venv by default. If neither of the n_epochs and n_batches keys are provided, then n_epochs is set to self.DEFAULT_N_EPOCHS.

Return type

int

Returns

New round number after advancing the round counter.

property logger: HierarchicalLogger

Returns logger for this object.

Return type

HierarchicalLogger

property policy: BasePolicy
Return type

BasePolicy

save_trainer()[source]

Create a snapshot of trainer in the scratch/working directory.

The created snapshot can be reloaded with reconstruct_trainer(). In addition to saving one copy of the policy in the trainer snapshot, this method saves a second copy of the policy in its own file. Having a second copy of the policy is convenient because it can be loaded on its own and passed to evaluation routines for other algorithms.

Returns

a path to one of the created DAggerTrainer checkpoints. policy_path: a path to one of the created DAggerTrainer policies.

Return type

checkpoint_path

class imitation.algorithms.dagger.SimpleDAggerTrainer(*, venv, scratch_dir, expert_policy, rng, expert_trajs=None, **dagger_trainer_kwargs)[source]

Bases: DAggerTrainer

Simpler subclass of DAggerTrainer for training with synthetic feedback.

DEFAULT_N_EPOCHS: int = 4

The default number of BC training epochs in extend_and_update.

__init__(*, venv, scratch_dir, expert_policy, rng, expert_trajs=None, **dagger_trainer_kwargs)[source]

Builds SimpleDAggerTrainer.

Parameters
  • venv (VecEnv) – Vectorized training environment. Note that when the robot action is randomly injected (in accordance with beta_schedule argument), every individual environment will get a robot action simultaneously for that timestep.

  • scratch_dir (Union[str, bytes, PathLike]) – Directory to use to store intermediate training information (e.g. for resuming training).

  • expert_policy (BasePolicy) – The expert policy used to generate synthetic demonstrations.

  • rng (Generator) – Random state to use for the random number generator.

  • expert_trajs (Optional[Sequence[Trajectory]]) – Optional starting dataset that is inserted into the round 0 dataset.

  • dagger_trainer_kwargs – Other keyword arguments passed to the superclass initializer DAggerTrainer.__init__.

Raises

ValueError – The observation or action space does not match between venv and expert_policy.

allow_variable_horizon: bool

If True, allow variable horizon trajectories; otherwise error if detected.

property batch_size: int
Return type

int

create_trajectory_collector()

Create trajectory collector to extend current round’s demonstration set.

Return type

InteractiveTrajectoryCollector

Returns

A collector configured with the appropriate beta, imitator policy, etc. for the current round. Refer to the documentation for InteractiveTrajectoryCollector to see how to use this.

extend_and_update(bc_train_kwargs=None)

Extend internal batch of data and train BC.

Specifically, this method will load new transitions (if necessary), train the model for a while, and advance the round counter. If there are no fresh demonstrations in the demonstration directory for the current round, then this will raise a NeedsDemosException instead of training or advancing the round counter. In that case, the user should call .create_trajectory_collector() and use the returned InteractiveTrajectoryCollector to produce a new set of demonstrations for the current interaction round.

Parameters

bc_train_kwargs (Optional[Mapping[str, Any]]) – Keyword arguments for calling BC.train(). If the log_rollouts_venv key is not provided, then it is set to self.venv by default. If neither of the n_epochs and n_batches keys are provided, then n_epochs is set to self.DEFAULT_N_EPOCHS.

Return type

int

Returns

New round number after advancing the round counter.

property logger: HierarchicalLogger

Returns logger for this object.

Return type

HierarchicalLogger

property policy: BasePolicy
Return type

BasePolicy

save_trainer()

Create a snapshot of trainer in the scratch/working directory.

The created snapshot can be reloaded with reconstruct_trainer(). In addition to saving one copy of the policy in the trainer snapshot, this method saves a second copy of the policy in its own file. Having a second copy of the policy is convenient because it can be loaded on its own and passed to evaluation routines for other algorithms.

Returns

a path to one of the created DAggerTrainer checkpoints. policy_path: a path to one of the created DAggerTrainer policies.

Return type

checkpoint_path

train(total_timesteps, *, rollout_round_min_episodes=3, rollout_round_min_timesteps=500, bc_train_kwargs=None)[source]

Train the DAgger agent.

The agent is trained in “rounds” where each round consists of a dataset aggregation step followed by BC update step.

During a dataset aggregation step, self.expert_policy is used to perform rollouts in the environment but there is a 1 - beta chance (beta is determined from the round number and self.beta_schedule) that the DAgger agent’s action is used instead. Regardless of whether the DAgger agent’s action is used during the rollout, the expert action and corresponding observation are always appended to the dataset. The number of environment steps in the dataset aggregation stage is determined by the rollout_round_min* arguments.

During a BC update step, BC.train() is called to update the DAgger agent on all data collected so far.

Parameters
  • total_timesteps (int) – The number of timesteps to train inside the environment. In practice this is a lower bound, because the number of timesteps is rounded up to finish the minimum number of episodes or timesteps in the last DAgger training round, and the environment timesteps are executed in multiples of self.venv.num_envs.

  • rollout_round_min_episodes (int) – The number of episodes the must be completed completed before a dataset aggregation step ends.

  • rollout_round_min_timesteps (int) – The number of environment timesteps that must be completed before a dataset aggregation step ends. Also, that any round will always train for at least self.batch_size timesteps, because otherwise BC could fail to receive any batches.

  • bc_train_kwargs (Optional[dict]) – Keyword arguments for calling BC.train(). If the log_rollouts_venv key is not provided, then it is set to self.venv by default. If neither of the n_epochs and n_batches keys are provided, then n_epochs is set to self.DEFAULT_N_EPOCHS.

Return type

None