"""DAgger (https://arxiv.org/pdf/1011.0686.pdf).
Interactively trains policy by collecting some demonstrations, doing BC, collecting more
demonstrations, doing BC again, etc. Initially the demonstrations just come from the
expert's policy; over time, they shift to be drawn more and more from the imitator's
policy.
"""
import abc
import logging
import os
import pathlib
import uuid
from typing import Any, Callable, List, Mapping, Optional, Sequence, Tuple, Union
import numpy as np
import torch as th
from stable_baselines3.common import policies, utils, vec_env
from stable_baselines3.common.vec_env.base_vec_env import VecEnvStepReturn
from torch.utils import data as th_data
from imitation.algorithms import base, bc
from imitation.data import rollout, serialize, types
from imitation.util import logger as imit_logger
from imitation.util import util
[docs]class BetaSchedule(abc.ABC):
"""Computes beta (% of time demonstration action used) from training round."""
@abc.abstractmethod
def __call__(self, round_num: int) -> float:
"""Computes the value of beta for the current round.
Args:
round_num: the current round number. Rounds are assumed to be sequentially
numbered from 0.
Returns:
The fraction of the time to sample a demonstrator action. Robot
actions will be sampled the remainder of the time.
""" # noqa: DAR202
[docs]class LinearBetaSchedule(BetaSchedule):
"""Linearly-decreasing schedule for beta."""
[docs] def __init__(self, rampdown_rounds: int) -> None:
"""Builds LinearBetaSchedule.
Args:
rampdown_rounds: number of rounds over which to anneal beta.
"""
self.rampdown_rounds = rampdown_rounds
def __call__(self, round_num: int) -> float:
"""Computes beta value.
Args:
round_num: the current round number.
Returns:
beta linearly decreasing from `1` to `0` between round `0` and
`self.rampdown_rounds`. After that, it is 0.
"""
assert round_num >= 0
return min(1, max(0, (self.rampdown_rounds - round_num) / self.rampdown_rounds))
[docs]class ExponentialBetaSchedule(BetaSchedule):
"""Exponentially decaying schedule for beta."""
[docs] def __init__(self, decay_probability: float):
"""Builds ExponentialBetaSchedule.
Args:
decay_probability: the decay factor for beta.
Raises:
ValueError: if `decay_probability` not within (0, 1].
"""
if not (0 < decay_probability <= 1):
raise ValueError("decay_probability lies outside the range (0, 1].")
self.decay_probability = decay_probability
def __call__(self, round_num: int) -> float:
"""Computes beta value.
Args:
round_num: the current round number.
Returns:
beta as `self.decay_probability ^ round_num`
"""
assert round_num >= 0
return self.decay_probability**round_num
[docs]def reconstruct_trainer(
scratch_dir: types.AnyPath,
venv: vec_env.VecEnv,
custom_logger: Optional[imit_logger.HierarchicalLogger] = None,
device: Union[th.device, str] = "auto",
) -> "DAggerTrainer":
"""Reconstruct trainer from the latest snapshot in some working directory.
Requires vectorized environment and (optionally) a logger, as these objects
cannot be serialized.
Args:
scratch_dir: path to the working directory created by a previous run of
this algorithm. The directory should contain `checkpoint-latest.pt` and
`policy-latest.pt` files.
venv: Vectorized training environment.
custom_logger: Where to log to; if None (default), creates a new logger.
device: device on which to load the trainer.
Returns:
A deserialized `DAggerTrainer`.
"""
custom_logger = custom_logger or imit_logger.configure()
scratch_dir = util.parse_path(scratch_dir)
checkpoint_path = scratch_dir / "checkpoint-latest.pt"
trainer = th.load(checkpoint_path, map_location=utils.get_device(device))
trainer.venv = venv
trainer._logger = custom_logger
return trainer
def _save_dagger_demo(
trajectory: types.Trajectory,
trajectory_index: int,
save_dir: types.AnyPath,
rng: np.random.Generator,
prefix: str = "",
) -> None:
save_dir = util.parse_path(save_dir)
assert isinstance(trajectory, types.Trajectory)
actual_prefix = f"{prefix}-" if prefix else ""
randbits = int.from_bytes(rng.bytes(16), "big")
random_uuid = uuid.UUID(int=randbits, version=4).hex
filename = f"{actual_prefix}dagger-demo-{trajectory_index}-{random_uuid}.npz"
npz_path = save_dir / filename
assert (
not npz_path.exists()
), "The following DAgger demonstration path already exists: {0}".format(npz_path)
serialize.save(npz_path, [trajectory])
logging.info(f"Saved demo at '{npz_path}'")
[docs]class InteractiveTrajectoryCollector(vec_env.VecEnvWrapper):
"""DAgger VecEnvWrapper for querying and saving expert actions.
Every call to `.step(actions)` accepts and saves expert actions to `self.save_dir`,
but only forwards expert actions to the wrapped VecEnv with probability
`self.beta`. With probability `1 - self.beta`, a "robot" action (i.e
an action from the imitation policy) is forwarded instead.
Demonstrations are saved as `TrajectoryWithRew` to `self.save_dir` at the end
of every episode.
"""
traj_accum: Optional[rollout.TrajectoryAccumulator]
_last_obs: Optional[np.ndarray]
_last_user_actions: Optional[np.ndarray]
[docs] def __init__(
self,
venv: vec_env.VecEnv,
get_robot_acts: Callable[[np.ndarray], np.ndarray],
beta: float,
save_dir: types.AnyPath,
rng: np.random.Generator,
) -> None:
"""Builds InteractiveTrajectoryCollector.
Args:
venv: vectorized environment to sample trajectories from.
get_robot_acts: get robot actions that can be substituted for
human actions. Takes a vector of observations as input & returns a
vector of actions.
beta: fraction of the time to use action given to .step() instead of
robot action. The choice of robot or human action is independently
randomized for each individual `Env` at every timestep.
save_dir: directory to save collected trajectories in.
rng: random state for random number generation.
"""
super().__init__(venv)
self.get_robot_acts = get_robot_acts
assert 0 <= beta <= 1
self.beta = beta
self.traj_accum = None
self.save_dir = save_dir
self._last_obs = None
self._done_before = True
self._is_reset = False
self._last_user_actions = None
self.rng = rng
[docs] def seed(self, seed: Optional[int] = None) -> List[Optional[int]]:
"""Set the seed for the DAgger random number generator and wrapped VecEnv.
The DAgger RNG is used along with `self.beta` to determine whether the expert
or robot action is forwarded to the wrapped VecEnv.
Args:
seed: The random seed. May be None for completely random seeding.
Returns:
A list containing the seeds for each individual env. Note that all list
elements may be None, if the env does not return anything when seeded.
"""
self.rng = np.random.default_rng(seed=seed)
return list(self.venv.seed(seed))
[docs] def reset(self) -> np.ndarray:
"""Resets the environment.
Returns:
obs: first observation of a new trajectory.
"""
self.traj_accum = rollout.TrajectoryAccumulator()
obs = self.venv.reset()
assert isinstance(obs, np.ndarray)
for i, ob in enumerate(obs):
self.traj_accum.add_step({"obs": ob}, key=i)
self._last_obs = obs
self._is_reset = True
self._last_user_actions = None
return obs
[docs] def step_async(self, actions: np.ndarray) -> None:
"""Steps with a `1 - beta` chance of using `self.get_robot_acts` instead.
DAgger needs to be able to inject imitation policy actions randomly at some
subset of time steps. This method has a `self.beta` chance of keeping the
`actions` passed in as an argument, and a `1 - self.beta` chance of
forwarding actions generated by `self.get_robot_acts` instead.
"robot" (i.e. imitation policy) action if necessary.
At the end of every episode, a `TrajectoryWithRew` is saved to `self.save_dir`,
where every saved action is the expert action, regardless of whether the
robot action was used during that timestep.
Args:
actions: the _intended_ demonstrator/expert actions for the current
state. This will be executed with probability `self.beta`.
Otherwise, a "robot" (typically a BC policy) action will be sampled
and executed instead via `self.get_robot_act`.
"""
assert self._is_reset, "call .reset() before .step()"
assert self._last_obs is not None
# Replace each given action with a robot action 100*(1-beta)% of the time.
actual_acts = np.array(actions)
mask = self.rng.uniform(0, 1, size=(self.num_envs,)) > self.beta
if np.sum(mask) != 0:
actual_acts[mask] = self.get_robot_acts(self._last_obs[mask])
self._last_user_actions = actions
self.venv.step_async(actual_acts)
[docs] def step_wait(self) -> VecEnvStepReturn:
"""Returns observation, reward, etc after previous `step_async()` call.
Stores the transition, and saves trajectory as demo once complete.
Returns:
Observation, reward, dones (is terminal?) and info dict.
"""
next_obs, rews, dones, infos = self.venv.step_wait()
assert isinstance(next_obs, np.ndarray)
assert self.traj_accum is not None
assert self._last_user_actions is not None
self._last_obs = next_obs
fresh_demos = self.traj_accum.add_steps_and_auto_finish(
obs=next_obs,
acts=self._last_user_actions,
rews=rews,
infos=infos,
dones=dones,
)
for traj_index, traj in enumerate(fresh_demos):
_save_dagger_demo(traj, traj_index, self.save_dir, self.rng)
return next_obs, rews, dones, infos
[docs]class NeedsDemosException(Exception):
"""Signals demos need to be collected for current round before continuing."""
[docs]class DAggerTrainer(base.BaseImitationAlgorithm):
"""DAgger training class with low-level API suitable for interactive human feedback.
In essence, this is just BC with some helpers for incrementally
resuming training and interpolating between demonstrator/learnt policies.
Interaction proceeds in "rounds" in which the demonstrator first provides a
fresh set of demonstrations, and then an underlying `BC` is invoked to
fine-tune the policy on the entire set of demonstrations collected in all
rounds so far. Demonstrations and policy/trainer checkpoints are stored in a
directory with the following structure::
scratch-dir-name/
checkpoint-001.pt
checkpoint-002.pt
…
checkpoint-XYZ.pt
checkpoint-latest.pt
demos/
round-000/
demos_round_000_000.npz
demos_round_000_001.npz
…
round-001/
demos_round_001_000.npz
…
…
round-XYZ/
…
"""
_all_demos: List[types.Trajectory]
DEFAULT_N_EPOCHS: int = 4
"""The default number of BC training epochs in `extend_and_update`."""
[docs] def __init__(
self,
*,
venv: vec_env.VecEnv,
scratch_dir: types.AnyPath,
rng: np.random.Generator,
beta_schedule: Optional[Callable[[int], float]] = None,
bc_trainer: bc.BC,
custom_logger: Optional[imit_logger.HierarchicalLogger] = None,
):
"""Builds DAggerTrainer.
Args:
venv: Vectorized training environment.
scratch_dir: Directory to use to store intermediate training
information (e.g. for resuming training).
rng: random state for random number generation.
beta_schedule: Provides a value of `beta` (the probability of taking
expert action in any given state) at each round of training. If
`None`, then `linear_beta_schedule` will be used instead.
bc_trainer: A `BC` instance used to train the underlying policy.
custom_logger: Where to log to; if None (default), creates a new logger.
"""
super().__init__(custom_logger=custom_logger)
if beta_schedule is None:
beta_schedule = LinearBetaSchedule(15)
self.beta_schedule = beta_schedule
self.scratch_dir = util.parse_path(scratch_dir)
self.venv = venv
self.round_num = 0
self._last_loaded_round = -1
self._all_demos = []
self.rng = rng
utils.check_for_correct_spaces(
self.venv,
bc_trainer.observation_space,
bc_trainer.action_space,
)
self.bc_trainer = bc_trainer
self.bc_trainer.logger = self.logger
def __getstate__(self):
"""Return state excluding non-pickleable objects."""
d = dict(self.__dict__)
del d["venv"]
del d["_logger"]
return d
@property
def logger(self) -> imit_logger.HierarchicalLogger:
"""Returns logger for this object."""
return super().logger
@logger.setter
def logger(self, value: imit_logger.HierarchicalLogger) -> None:
# DAgger and inner-BC logger should stay in sync
self._logger = value
self.bc_trainer.logger = value
@property
def policy(self) -> policies.BasePolicy:
return self.bc_trainer.policy
@property
def batch_size(self) -> int:
return self.bc_trainer.batch_size
def _load_all_demos(self) -> Tuple[types.Transitions, List[int]]:
num_demos_by_round = []
for round_num in range(self._last_loaded_round + 1, self.round_num + 1):
round_dir = self._demo_dir_path_for_round(round_num)
demo_paths = self._get_demo_paths(round_dir)
self._all_demos.extend(serialize.load(p)[0] for p in demo_paths)
num_demos_by_round.append(len(demo_paths))
logging.info(f"Loaded {len(self._all_demos)} total")
demo_transitions = rollout.flatten_trajectories(self._all_demos)
return demo_transitions, num_demos_by_round
def _get_demo_paths(self, round_dir: pathlib.Path) -> List[pathlib.Path]:
# listdir returns filenames in an arbitrary order that depends on the
# file system implementation:
# https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic
# To ensure the order is consistent across file systems,
# we sort by the filename.
filenames = sorted(os.listdir(round_dir))
return [round_dir / f for f in filenames if f.endswith(".npz")]
def _demo_dir_path_for_round(self, round_num: Optional[int] = None) -> pathlib.Path:
if round_num is None:
round_num = self.round_num
return self.scratch_dir / "demos" / f"round-{round_num:03d}"
def _try_load_demos(self) -> None:
"""Load the dataset for this round into self.bc_trainer as a DataLoader."""
demo_dir = self._demo_dir_path_for_round()
demo_paths = self._get_demo_paths(demo_dir) if demo_dir.is_dir() else []
if len(demo_paths) == 0:
raise NeedsDemosException(
f"No demos found for round {self.round_num} in dir '{demo_dir}'. "
f"Maybe you need to collect some demos? See "
f".create_trajectory_collector()",
)
if self._last_loaded_round < self.round_num:
transitions, num_demos = self._load_all_demos()
logging.info(
f"Loaded {sum(num_demos)} new demos from {len(num_demos)} rounds",
)
if len(transitions) < self.batch_size:
raise ValueError(
"Not enough transitions to form a single batch: "
f"self.batch_size={self.batch_size} > "
f"len(transitions)={len(transitions)}",
)
data_loader = th_data.DataLoader(
transitions,
self.batch_size,
drop_last=True,
shuffle=True,
collate_fn=types.transitions_collate_fn,
)
self.bc_trainer.set_demonstrations(data_loader)
self._last_loaded_round = self.round_num
[docs] def extend_and_update(
self,
bc_train_kwargs: Optional[Mapping[str, Any]] = None,
) -> int:
"""Extend internal batch of data and train BC.
Specifically, this method will load new transitions (if necessary), train
the model for a while, and advance the round counter. If there are no fresh
demonstrations in the demonstration directory for the current round, then
this will raise a `NeedsDemosException` instead of training or advancing
the round counter. In that case, the user should call
`.create_trajectory_collector()` and use the returned
`InteractiveTrajectoryCollector` to produce a new set of demonstrations for
the current interaction round.
Arguments:
bc_train_kwargs: Keyword arguments for calling `BC.train()`. If
the `log_rollouts_venv` key is not provided, then it is set to
`self.venv` by default. If neither of the `n_epochs` and `n_batches`
keys are provided, then `n_epochs` is set to `self.DEFAULT_N_EPOCHS`.
Returns:
New round number after advancing the round counter.
"""
if bc_train_kwargs is None:
bc_train_kwargs = {}
else:
bc_train_kwargs = dict(bc_train_kwargs)
user_keys = bc_train_kwargs.keys()
if "log_rollouts_venv" not in user_keys:
bc_train_kwargs["log_rollouts_venv"] = self.venv
if "n_epochs" not in user_keys and "n_batches" not in user_keys:
bc_train_kwargs["n_epochs"] = self.DEFAULT_N_EPOCHS
logging.info("Loading demonstrations")
self._try_load_demos()
logging.info(f"Training at round {self.round_num}")
self.bc_trainer.train(**bc_train_kwargs)
self.round_num += 1
logging.info(f"New round number is {self.round_num}")
return self.round_num
[docs] def create_trajectory_collector(self) -> InteractiveTrajectoryCollector:
"""Create trajectory collector to extend current round's demonstration set.
Returns:
A collector configured with the appropriate beta, imitator policy, etc.
for the current round. Refer to the documentation for
`InteractiveTrajectoryCollector` to see how to use this.
"""
save_dir = self._demo_dir_path_for_round()
beta = self.beta_schedule(self.round_num)
collector = InteractiveTrajectoryCollector(
venv=self.venv,
get_robot_acts=lambda acts: self.bc_trainer.policy.predict(acts)[0],
beta=beta,
save_dir=save_dir,
rng=self.rng,
)
return collector
[docs] def save_trainer(self) -> Tuple[pathlib.Path, pathlib.Path]:
"""Create a snapshot of trainer in the scratch/working directory.
The created snapshot can be reloaded with `reconstruct_trainer()`.
In addition to saving one copy of the policy in the trainer snapshot, this
method saves a second copy of the policy in its own file. Having a second copy
of the policy is convenient because it can be loaded on its own and passed to
evaluation routines for other algorithms.
Returns:
checkpoint_path: a path to one of the created `DAggerTrainer` checkpoints.
policy_path: a path to one of the created `DAggerTrainer` policies.
"""
self.scratch_dir.mkdir(parents=True, exist_ok=True)
# save full trainer checkpoints
checkpoint_paths = [
self.scratch_dir / f"checkpoint-{self.round_num:03d}.pt",
self.scratch_dir / "checkpoint-latest.pt",
]
for checkpoint_path in checkpoint_paths:
th.save(self, checkpoint_path)
# save policies separately for convenience
policy_paths = [
self.scratch_dir / f"policy-{self.round_num:03d}.pt",
self.scratch_dir / "policy-latest.pt",
]
for policy_path in policy_paths:
util.save_policy(self.policy, policy_path)
return checkpoint_paths[0], policy_paths[0]
[docs]class SimpleDAggerTrainer(DAggerTrainer):
"""Simpler subclass of DAggerTrainer for training with synthetic feedback."""
[docs] def __init__(
self,
*,
venv: vec_env.VecEnv,
scratch_dir: types.AnyPath,
expert_policy: policies.BasePolicy,
rng: np.random.Generator,
expert_trajs: Optional[Sequence[types.Trajectory]] = None,
**dagger_trainer_kwargs,
):
"""Builds SimpleDAggerTrainer.
Args:
venv: Vectorized training environment. Note that when the robot
action is randomly injected (in accordance with `beta_schedule`
argument), every individual environment will get a robot action
simultaneously for that timestep.
scratch_dir: Directory to use to store intermediate training
information (e.g. for resuming training).
expert_policy: The expert policy used to generate synthetic demonstrations.
rng: Random state to use for the random number generator.
expert_trajs: Optional starting dataset that is inserted into the round 0
dataset.
dagger_trainer_kwargs: Other keyword arguments passed to the
superclass initializer `DAggerTrainer.__init__`.
Raises:
ValueError: The observation or action space does not match between
`venv` and `expert_policy`.
"""
super().__init__(
venv=venv,
scratch_dir=scratch_dir,
rng=rng,
**dagger_trainer_kwargs,
)
self.expert_policy = expert_policy
if expert_policy.observation_space != self.venv.observation_space:
raise ValueError(
"Mismatched observation space between expert_policy and venv",
)
if expert_policy.action_space != self.venv.action_space:
raise ValueError("Mismatched action space between expert_policy and venv")
# TODO(shwang):
# Might welcome Transitions and DataLoaders as sources of expert data
# in the future too, but this will require some refactoring, so for
# now we just have `expert_trajs`.
if expert_trajs is not None:
# Save each initial expert trajectory into the "round 0" demonstration
# data directory.
for traj_index, traj in enumerate(expert_trajs):
_save_dagger_demo(
traj,
traj_index,
self._demo_dir_path_for_round(),
self.rng,
prefix="initial_data",
)
[docs] def train(
self,
total_timesteps: int,
*,
rollout_round_min_episodes: int = 3,
rollout_round_min_timesteps: int = 500,
bc_train_kwargs: Optional[dict] = None,
) -> None:
"""Train the DAgger agent.
The agent is trained in "rounds" where each round consists of a dataset
aggregation step followed by BC update step.
During a dataset aggregation step, `self.expert_policy` is used to perform
rollouts in the environment but there is a `1 - beta` chance (beta is
determined from the round number and `self.beta_schedule`) that the DAgger
agent's action is used instead. Regardless of whether the DAgger agent's action
is used during the rollout, the expert action and corresponding observation are
always appended to the dataset. The number of environment steps in the
dataset aggregation stage is determined by the `rollout_round_min*` arguments.
During a BC update step, `BC.train()` is called to update the DAgger agent on
all data collected so far.
Args:
total_timesteps: The number of timesteps to train inside the environment.
In practice this is a lower bound, because the number of timesteps is
rounded up to finish the minimum number of episodes or timesteps in the
last DAgger training round, and the environment timesteps are executed
in multiples of `self.venv.num_envs`.
rollout_round_min_episodes: The number of episodes the must be completed
completed before a dataset aggregation step ends.
rollout_round_min_timesteps: The number of environment timesteps that must
be completed before a dataset aggregation step ends. Also, that any
round will always train for at least `self.batch_size` timesteps,
because otherwise BC could fail to receive any batches.
bc_train_kwargs: Keyword arguments for calling `BC.train()`. If
the `log_rollouts_venv` key is not provided, then it is set to
`self.venv` by default. If neither of the `n_epochs` and `n_batches`
keys are provided, then `n_epochs` is set to `self.DEFAULT_N_EPOCHS`.
"""
total_timestep_count = 0
round_num = 0
while total_timestep_count < total_timesteps:
collector = self.create_trajectory_collector()
round_episode_count = 0
round_timestep_count = 0
sample_until = rollout.make_sample_until(
min_timesteps=max(rollout_round_min_timesteps, self.batch_size),
min_episodes=rollout_round_min_episodes,
)
trajectories = rollout.generate_trajectories(
policy=self.expert_policy,
venv=collector,
sample_until=sample_until,
deterministic_policy=True,
rng=collector.rng,
)
for traj in trajectories:
self._logger.record_mean(
"dagger/mean_episode_reward",
np.sum(traj.rews),
)
round_timestep_count += len(traj)
total_timestep_count += len(traj)
round_episode_count += len(trajectories)
self._logger.record("dagger/total_timesteps", total_timestep_count)
self._logger.record("dagger/round_num", round_num)
self._logger.record("dagger/round_episode_count", round_episode_count)
self._logger.record("dagger/round_timestep_count", round_timestep_count)
# `logger.dump` is called inside BC.train within the following fn call:
self.extend_and_update(bc_train_kwargs)
round_num += 1