Source code for imitation.algorithms.bc

"""Behavioural Cloning (BC).

Trains policy by applying supervised learning to a fixed dataset of (observation,
action) pairs generated by some expert demonstrator.
"""

import dataclasses
import itertools
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    Mapping,
    Optional,
    Tuple,
    Type,
    Union,
)

import gymnasium as gym
import numpy as np
import torch as th
import tqdm
from stable_baselines3.common import policies, torch_layers, utils, vec_env

from imitation.algorithms import base as algo_base
from imitation.data import rollout, types
from imitation.policies import base as policy_base
from imitation.util import logger as imit_logger
from imitation.util import util


[docs]@dataclasses.dataclass(frozen=True) class BatchIteratorWithEpochEndCallback: """Loops through batches from a batch loader and calls a callback after every epoch. Will throw an exception when an epoch contains no batches. """ batch_loader: Iterable[types.TransitionMapping] n_epochs: Optional[int] n_batches: Optional[int] on_epoch_end: Optional[Callable[[int], None]] def __post_init__(self) -> None: epochs_and_batches_specified = ( self.n_epochs is not None and self.n_batches is not None ) neither_epochs_nor_batches_specified = ( self.n_epochs is None and self.n_batches is None ) if epochs_and_batches_specified or neither_epochs_nor_batches_specified: raise ValueError( "Must provide exactly one of `n_epochs` and `n_batches` arguments.", ) def __iter__(self) -> Iterator[types.TransitionMapping]: def batch_iterator() -> Iterator[types.TransitionMapping]: # Note: the islice here ensures we do not exceed self.n_epochs for epoch_num in itertools.islice(itertools.count(), self.n_epochs): some_batch_was_yielded = False for batch in self.batch_loader: yield batch some_batch_was_yielded = True if not some_batch_was_yielded: raise AssertionError( f"Data loader returned no data during epoch " f"{epoch_num} -- did it reset correctly?", ) if self.on_epoch_end is not None: self.on_epoch_end(epoch_num) # Note: the islice here ensures we do not exceed self.n_batches return itertools.islice(batch_iterator(), self.n_batches)
[docs]@dataclasses.dataclass(frozen=True) class BCTrainingMetrics: """Container for the different components of behavior cloning loss.""" neglogp: th.Tensor entropy: Optional[th.Tensor] ent_loss: th.Tensor # set to 0 if entropy is None prob_true_act: th.Tensor l2_norm: th.Tensor l2_loss: th.Tensor loss: th.Tensor
[docs]@dataclasses.dataclass(frozen=True) class BehaviorCloningLossCalculator: """Functor to compute the loss used in Behavior Cloning.""" ent_weight: float l2_weight: float def __call__( self, policy: policies.ActorCriticPolicy, obs: Union[ types.AnyTensor, types.DictObs, Dict[str, np.ndarray], Dict[str, th.Tensor], ], acts: Union[th.Tensor, np.ndarray], ) -> BCTrainingMetrics: """Calculate the supervised learning loss used to train the behavioral clone. Args: policy: The actor-critic policy whose loss is being computed. obs: The observations seen by the expert. acts: The actions taken by the expert. Returns: A BCTrainingMetrics object with the loss and all the components it consists of. """ tensor_obs = types.map_maybe_dict( util.safe_to_tensor, types.maybe_unwrap_dictobs(obs), ) acts = util.safe_to_tensor(acts) # policy.evaluate_actions's type signatures are incorrect. # See https://github.com/DLR-RM/stable-baselines3/issues/1679 (_, log_prob, entropy) = policy.evaluate_actions( tensor_obs, # type: ignore[arg-type] acts, ) prob_true_act = th.exp(log_prob).mean() log_prob = log_prob.mean() entropy = entropy.mean() if entropy is not None else None l2_norms = [th.sum(th.square(w)) for w in policy.parameters()] l2_norm = sum(l2_norms) / 2 # divide by 2 to cancel with gradient of square # sum of list defaults to float(0) if len == 0. assert isinstance(l2_norm, th.Tensor) ent_loss = -self.ent_weight * (entropy if entropy is not None else th.zeros(1)) neglogp = -log_prob l2_loss = self.l2_weight * l2_norm loss = neglogp + ent_loss + l2_loss return BCTrainingMetrics( neglogp=neglogp, entropy=entropy, ent_loss=ent_loss, prob_true_act=prob_true_act, l2_norm=l2_norm, l2_loss=l2_loss, loss=loss, )
[docs]def enumerate_batches( batch_it: Iterable[types.TransitionMapping], ) -> Iterable[Tuple[Tuple[int, int, int], types.TransitionMapping]]: """Prepends batch stats before the batches of a batch iterator.""" num_samples_so_far = 0 for num_batches, batch in enumerate(batch_it): batch_size = len(batch["obs"]) num_samples_so_far += batch_size yield (num_batches, batch_size, num_samples_so_far), batch
[docs]@dataclasses.dataclass(frozen=True) class RolloutStatsComputer: """Computes statistics about rollouts. Args: venv: The vectorized environment in which to compute the rollouts. n_episodes: The number of episodes to base the statistics on. """ venv: Optional[vec_env.VecEnv] n_episodes: int # TODO(shwang): Maybe instead use a callback that can be shared between # all algorithms' `.train()` for generating rollout stats. # EvalCallback could be a good fit: # https://stable-baselines3.readthedocs.io/en/master/guide/callbacks.html#evalcallback def __call__( self, policy: policies.ActorCriticPolicy, rng: np.random.Generator, ) -> Mapping[str, float]: if self.venv is not None and self.n_episodes > 0: trajs = rollout.generate_trajectories( policy, self.venv, rollout.make_min_episodes(self.n_episodes), rng=rng, ) return rollout.rollout_stats(trajs) else: return dict()
[docs]class BCLogger: """Utility class to help logging information relevant to Behavior Cloning."""
[docs] def __init__(self, logger: imit_logger.HierarchicalLogger): """Create new BC logger. Args: logger: The logger to feed all the information to. """ self._logger = logger self._tensorboard_step = 0 self._current_epoch = 0
[docs] def reset_tensorboard_steps(self): self._tensorboard_step = 0
[docs] def log_epoch(self, epoch_number): self._current_epoch = epoch_number
[docs] def log_batch( self, batch_num: int, batch_size: int, num_samples_so_far: int, training_metrics: BCTrainingMetrics, rollout_stats: Mapping[str, float], ): self._logger.record("batch_size", batch_size) self._logger.record("bc/epoch", self._current_epoch) self._logger.record("bc/batch", batch_num) self._logger.record("bc/samples_so_far", num_samples_so_far) for k, v in training_metrics.__dict__.items(): self._logger.record(f"bc/{k}", float(v) if v is not None else None) for k, v in rollout_stats.items(): if "return" in k and "monitor" not in k: self._logger.record("rollout/" + k, v) self._logger.dump(self._tensorboard_step) self._tensorboard_step += 1
def __getstate__(self): state = self.__dict__.copy() del state["_logger"] return state
[docs]def reconstruct_policy( policy_path: str, device: Union[th.device, str] = "auto", ) -> policies.ActorCriticPolicy: """Reconstruct a saved policy. Args: policy_path: path where `.save_policy()` has been run. device: device on which to load the policy. Returns: policy: policy with reloaded weights. """ policy = th.load(policy_path, map_location=utils.get_device(device)) assert isinstance(policy, policies.ActorCriticPolicy) return policy
[docs]class BC(algo_base.DemonstrationAlgorithm): """Behavioral cloning (BC). Recovers a policy via supervised learning from observation-action pairs. """
[docs] def __init__( self, *, observation_space: gym.Space, action_space: gym.Space, rng: np.random.Generator, policy: Optional[policies.ActorCriticPolicy] = None, demonstrations: Optional[algo_base.AnyTransitions] = None, batch_size: int = 32, minibatch_size: Optional[int] = None, optimizer_cls: Type[th.optim.Optimizer] = th.optim.Adam, optimizer_kwargs: Optional[Mapping[str, Any]] = None, ent_weight: float = 1e-3, l2_weight: float = 0.0, device: Union[str, th.device] = "auto", custom_logger: Optional[imit_logger.HierarchicalLogger] = None, ): """Builds BC. Args: observation_space: the observation space of the environment. action_space: the action space of the environment. rng: the random state to use for the random number generator. policy: a Stable Baselines3 policy; if unspecified, defaults to `FeedForward32Policy`. demonstrations: Demonstrations from an expert (optional). Transitions expressed directly as a `types.TransitionsMinimal` object, a sequence of trajectories, or an iterable of transition batches (mappings from keywords to arrays containing observations, etc). batch_size: The number of samples in each batch of expert data. minibatch_size: size of minibatch to calculate gradients over. The gradients are accumulated until `batch_size` examples are processed before making an optimization step. This is useful in GPU training to reduce memory usage, since fewer examples are loaded into memory at once, facilitating training with larger batch sizes, but is generally slower. Must be a factor of `batch_size`. Optional, defaults to `batch_size`. optimizer_cls: optimiser to use for supervised training. optimizer_kwargs: keyword arguments, excluding learning rate and weight decay, for optimiser construction. ent_weight: scaling applied to the policy's entropy regularization. l2_weight: scaling applied to the policy's L2 regularization. device: name/identity of device to place policy on. custom_logger: Where to log to; if None (default), creates a new logger. Raises: ValueError: If `weight_decay` is specified in `optimizer_kwargs` (use the parameter `l2_weight` instead), or if the batch size is not a multiple of the minibatch size. """ self._demo_data_loader: Optional[Iterable[types.TransitionMapping]] = None self.batch_size = batch_size self.minibatch_size = minibatch_size or batch_size if self.batch_size % self.minibatch_size != 0: # pragma: no cover raise ValueError("Batch size must be a multiple of minibatch size.") super().__init__( demonstrations=demonstrations, custom_logger=custom_logger, ) self._bc_logger = BCLogger(self.logger) self.action_space = action_space self.observation_space = observation_space self.rng = rng if policy is None: extractor = ( torch_layers.CombinedExtractor if isinstance(observation_space, gym.spaces.Dict) else torch_layers.FlattenExtractor ) policy = policy_base.FeedForward32Policy( observation_space=observation_space, action_space=action_space, # Set lr_schedule to max value to force error if policy.optimizer # is used by mistake (should use self.optimizer instead). lr_schedule=lambda _: th.finfo(th.float32).max, features_extractor_class=extractor, ) self._policy = policy.to(utils.get_device(device)) # TODO(adam): make policy mandatory and delete observation/action space params? assert self.policy.observation_space == self.observation_space assert self.policy.action_space == self.action_space if optimizer_kwargs: if "weight_decay" in optimizer_kwargs: # pragma: no cover raise ValueError("Use the parameter l2_weight instead of weight_decay.") optimizer_kwargs = optimizer_kwargs or {} self.optimizer = optimizer_cls( self.policy.parameters(), **optimizer_kwargs, ) self.loss_calculator = BehaviorCloningLossCalculator(ent_weight, l2_weight)
@property def policy(self) -> policies.ActorCriticPolicy: return self._policy
[docs] def set_demonstrations(self, demonstrations: algo_base.AnyTransitions) -> None: self._demo_data_loader = algo_base.make_data_loader( demonstrations, self.minibatch_size, )
[docs] def train( self, *, n_epochs: Optional[int] = None, n_batches: Optional[int] = None, on_epoch_end: Optional[Callable[[], None]] = None, on_batch_end: Optional[Callable[[], None]] = None, log_interval: int = 500, log_rollouts_venv: Optional[vec_env.VecEnv] = None, log_rollouts_n_episodes: int = 5, progress_bar: bool = True, reset_tensorboard: bool = False, ): """Train with supervised learning for some number of epochs. Here an 'epoch' is just a complete pass through the expert data loader, as set by `self.set_expert_data_loader()`. Note, that when you specify `n_batches` smaller than the number of batches in an epoch, the `on_epoch_end` callback will never be called. Args: n_epochs: Number of complete passes made through expert data before ending training. Provide exactly one of `n_epochs` and `n_batches`. n_batches: Number of batches loaded from dataset before ending training. Provide exactly one of `n_epochs` and `n_batches`. on_epoch_end: Optional callback with no parameters to run at the end of each epoch. on_batch_end: Optional callback with no parameters to run at the end of each batch. log_interval: Log stats after every log_interval batches. log_rollouts_venv: If not None, then this VecEnv (whose observation and actions spaces must match `self.observation_space` and `self.action_space`) is used to generate rollout stats, including average return and average episode length. If None, then no rollouts are generated. log_rollouts_n_episodes: Number of rollouts to generate when calculating rollout stats. Non-positive number disables rollouts. progress_bar: If True, then show a progress bar during training. reset_tensorboard: If True, then start plotting to Tensorboard from x=0 even if `.train()` logged to Tensorboard previously. Has no practical effect if `.train()` is being called for the first time. """ if reset_tensorboard: self._bc_logger.reset_tensorboard_steps() self._bc_logger.log_epoch(0) compute_rollout_stats = RolloutStatsComputer( log_rollouts_venv, log_rollouts_n_episodes, ) def _on_epoch_end(epoch_number: int): if tqdm_progress_bar is not None: total_num_epochs_str = f"of {n_epochs}" if n_epochs is not None else "" tqdm_progress_bar.display( f"Epoch {epoch_number} {total_num_epochs_str}", pos=1, ) self._bc_logger.log_epoch(epoch_number + 1) if on_epoch_end is not None: on_epoch_end() mini_per_batch = self.batch_size // self.minibatch_size n_minibatches = n_batches * mini_per_batch if n_batches is not None else None assert self._demo_data_loader is not None demonstration_batches = BatchIteratorWithEpochEndCallback( self._demo_data_loader, n_epochs, n_minibatches, _on_epoch_end, ) batches_with_stats = enumerate_batches(demonstration_batches) tqdm_progress_bar: Optional[tqdm.tqdm] = None if progress_bar: batches_with_stats = tqdm.tqdm( batches_with_stats, unit="batch", total=n_minibatches, ) tqdm_progress_bar = batches_with_stats def process_batch(): self.optimizer.step() self.optimizer.zero_grad() if batch_num % log_interval == 0: rollout_stats = compute_rollout_stats(self.policy, self.rng) self._bc_logger.log_batch( batch_num, minibatch_size, num_samples_so_far, training_metrics, rollout_stats, ) if on_batch_end is not None: on_batch_end() self.optimizer.zero_grad() for ( batch_num, minibatch_size, num_samples_so_far, ), batch in batches_with_stats: obs_tensor: Union[th.Tensor, Dict[str, th.Tensor]] # unwraps the observation if it's a dictobs and converts arrays to tensors obs_tensor = types.map_maybe_dict( lambda x: util.safe_to_tensor(x, device=self.policy.device), types.maybe_unwrap_dictobs(batch["obs"]), ) acts = util.safe_to_tensor(batch["acts"], device=self.policy.device) training_metrics = self.loss_calculator(self.policy, obs_tensor, acts) # Renormalise the loss to be averaged over the whole # batch size instead of the minibatch size. # If there is an incomplete batch, its gradients will be # smaller, which may be helpful for stability. loss = training_metrics.loss * minibatch_size / self.batch_size loss.backward() batch_num = batch_num * self.minibatch_size // self.batch_size if num_samples_so_far % self.batch_size == 0: process_batch() if num_samples_so_far % self.batch_size != 0: # if there remains an incomplete batch batch_num += 1 process_batch()