Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented the ability to train rewards in preference comparison against multiple policies #529

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a7eda89
implemented MixtureOfTrajectoryGeneratos
levmckinney Aug 9, 2022
1a9ab5d
added add_prefix method to HierarchicalLogger
levmckinney Aug 17, 2022
d442baa
moved logger.accumulate_means into AgentTrainer
levmckinney Aug 17, 2022
003bbcd
added option for multiple agents to train_preference_comparison
levmckinney Aug 17, 2022
f6a3d44
reduced the length of the prefix used in mixture of generators for lo…
levmckinney Aug 18, 2022
33110aa
Merge branch 'master' into policy_ensemble
levmckinney Aug 18, 2022
8c5433d
improved logic for whether to try and checkpoint policy
levmckinney Aug 18, 2022
83f8b8b
fixed final save checkpoint call
levmckinney Aug 18, 2022
803ffde
fixed log key when using prefix on windows
levmckinney Aug 18, 2022
21e11fe
clarified doc string and added runtime error
levmckinney Aug 22, 2022
5c9fc1e
responded to reviewers comments
levmckinney Aug 22, 2022
e478ec3
fixed logic bug
levmckinney Aug 23, 2022
75c0b80
fixed test
levmckinney Aug 23, 2022
73ace03
added pragma: no cover to test case which needs it
levmckinney Aug 23, 2022
b0b3b93
added doctest to logger explaining behavour
levmckinney Aug 23, 2022
dc60be8
Merge branch 'master' into policy_ensemble
levmckinney Aug 24, 2022
2958d08
added option to split training steps among the agents and made it def…
levmckinney Aug 24, 2022
303b032
Add type annotations to hierarchical logger
Rocamonde Sep 3, 2022
230774c
Move "is single agent" to explicit bool definition
Rocamonde Sep 3, 2022
d44cc85
Raise error when too few steps to partition.
Rocamonde Sep 3, 2022
07fa830
Formatter
Rocamonde Sep 3, 2022
dd72656
Fix "else" that was accidentally removed.
Rocamonde Sep 3, 2022
709cb42
Roll back change
Rocamonde Sep 3, 2022
06084ad
Added description to tests
levmckinney Sep 5, 2022
a2b790f
Unnested with statements
levmckinney Sep 5, 2022
510abd8
removed duplicate sentence in documentation
levmckinney Sep 5, 2022
232b56e
Apply suggestions from code review
levmckinney Sep 5, 2022
09670d3
Merge branch 'master' into policy_ensemble
levmckinney Sep 6, 2022
a5ab914
Merge branch 'master' into policy_ensemble
levmckinney Sep 13, 2022
408e248
fixed comment in doctest
levmckinney Sep 13, 2022
4df8551
Update src/imitation/algorithms/preference_comparisons.py
Rocamonde Sep 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ filterwarnings =
ignore:Using or importing the ABCs from 'collections':DeprecationWarning:(google|pkg_resources)
ignore:Parameters to load are deprecated:Warning:gym
ignore:The binary mode of fromstring is deprecated:DeprecationWarning:gym

addopts = --doctest-modules
markers =
expensive: mark a test as expensive (deselect with '-m "not expensive"')

Expand Down
123 changes: 109 additions & 14 deletions src/imitation/algorithms/preference_comparisons.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
Trains a reward model and optionally a policy based on preferences
between trajectory fragments.
"""
from __future__ import generators

import abc
import math
import pickle
Expand Down Expand Up @@ -210,18 +212,19 @@ def train(self, steps: int, **kwargs) -> None:
RuntimeError: Transitions left in `self.buffering_wrapper`; call
`self.sample` first to clear them.
"""
n_transitions = self.buffering_wrapper.n_transitions
if n_transitions:
raise RuntimeError(
f"There are {n_transitions} transitions left in the buffer. "
"Call AgentTrainer.sample() first to clear them.",
with self.logger.accumulate_means("agent"):
n_transitions = self.buffering_wrapper.n_transitions
if n_transitions:
raise RuntimeError(
f"There are {n_transitions} transitions left in the buffer. "
"Call AgentTrainer.sample() first to clear them.",
)
self.algorithm.learn(
total_timesteps=steps,
reset_num_timesteps=False,
callback=self.log_callback,
**kwargs,
)
self.algorithm.learn(
total_timesteps=steps,
reset_num_timesteps=False,
callback=self.log_callback,
**kwargs,
)

def sample(self, steps: int) -> Sequence[types.TrajectoryWithRew]:
agent_trajs, _ = self.buffering_wrapper.pop_finished_trajectories()
Expand Down Expand Up @@ -299,6 +302,99 @@ def logger(self, value: imit_logger.HierarchicalLogger):
self.algorithm.set_logger(self.logger)


class MixtureOfTrajectoryGenerators(TrajectoryGenerator):
"""A collection of trajectory generators merged together."""

members: Sequence[TrajectoryGenerator]

def __init__(
self,
members: Sequence[TrajectoryGenerator],
custom_logger: Optional[imit_logger.HierarchicalLogger] = None,
share_training_steps: bool = True,
):
"""Create a mixture of trajectory generators.

Args:
members: Individual trajectory generators that will make up the ensemble.
custom_logger: Custom logger passed to super class.
share_training_steps: If True, training steps are split equally among the
trajectory generators. If False, all each trajectory generator train
for the full number of steps. Defaults to True.

Raises:
ValueError: if members is empty.
"""
if len(members) < 2:
raise ValueError(
"MixtureOfTrajectoryGenerators requires at least two member!",
)
self.members = tuple(members)
super().__init__(custom_logger=custom_logger)
self.share_training_steps = share_training_steps

def _partition(self, steps: int) -> Sequence[int]:
"""Partition steps into len(self.members) close to equal parts."""
n = len(self.members)
# Approximately evenly partition work.
d = steps // n
if d == 0:
raise ValueError(
f"Cannot partition only {steps} steps among {n} members!",
)
r = steps % n
partition = [d] * n
for i in range(r):
partition[i] += 1
return partition

def sample(self, steps: int) -> Sequence[TrajectoryWithRew]:
"""Sample a batch of trajectories splitting evenly amongst the mixture members.

Args:
steps: All trajectories taken together should
have at least this many steps.

Returns:
A list of sampled trajectories with rewards (which should
be the environment rewards, not ones from a reward model).
"""
trajectories = []
for s, generator in zip(self._partition(steps), self.members):
trajectories.extend(generator.sample(s))
return trajectories

def train(self, steps: int, **kwargs):
"""Train each trajectory generator.

If self.share_training_steps is set to true, training steps are split equally
among the trajectory generators. Otherwise, all each trajectory generator trains
for the full number of steps.

Args:
steps: number of environment steps to train for.
**kwargs: additional keyword arguments to passed along to members.
"""
if self.share_training_steps:
steps_to_train = self._partition(steps)
else:
steps_to_train = [steps]

for i, (generator, s) in enumerate(zip(self.members, steps_to_train)):
with self.logger.add_prefix(f"gen_{i}"):
generator.train(s, **kwargs)

@property
def logger(self) -> imit_logger.HierarchicalLogger:
return self._logger

@logger.setter
def logger(self, value: imit_logger.HierarchicalLogger):
self._logger = value
for generator in self.members:
generator.logger = value


def _get_trajectories(
trajectories: Sequence[TrajectoryWithRew],
steps: int,
Expand Down Expand Up @@ -1523,9 +1619,8 @@ def train(
# at the end of training (where the reward model is presumably best)
if i == self.num_iterations - 1:
num_steps += extra_timesteps
with self.logger.accumulate_means("agent"):
self.logger.log(f"Training agent for {num_steps} timesteps")
self.trajectory_generator.train(steps=num_steps)
self.logger.log(f"Training agent for {num_steps} timesteps")
self.trajectory_generator.train(steps=num_steps)

self.logger.dump(self._iteration)

Expand Down
2 changes: 2 additions & 0 deletions src/imitation/scripts/config/train_preference_comparisons.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def train_defaults():
save_preferences = False # save preference dataset at the end?
agent_path = None # path to a (partially) trained agent to load at the beginning
# type of PreferenceGatherer to use
num_agents = 1 # The number of agents to train the reward against.
gatherer_cls = preference_comparisons.SyntheticGatherer
# arguments passed on to the PreferenceGatherer specified by gatherer_cls
gatherer_kwargs = {}
Expand All @@ -59,6 +60,7 @@ def train_defaults():

checkpoint_interval = 0 # Num epochs between saving (<0 disables, =0 final only)
query_schedule = "hyperbolic"
share_training_steps_among_agents = True


@train_preference_comparisons_ex.named_config
Expand Down
64 changes: 47 additions & 17 deletions src/imitation/scripts/train_preference_comparisons.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def train_preference_comparisons(
trajectory_generator_kwargs: Mapping[str, Any],
save_preferences: bool,
agent_path: Optional[str],
num_agents: int,
preference_model_kwargs: Mapping[str, Any],
reward_trainer_kwargs: Mapping[str, Any],
gatherer_cls: Type[preference_comparisons.PreferenceGatherer],
Expand All @@ -80,6 +81,7 @@ def train_preference_comparisons(
allow_variable_horizon: bool,
checkpoint_interval: int,
query_schedule: Union[str, type_aliases.Schedule],
share_training_steps_among_agents: bool,
) -> Mapping[str, Any]:
"""Train a reward model using preference comparisons.

Expand Down Expand Up @@ -113,6 +115,7 @@ def train_preference_comparisons(
save_preferences: if True, store the final dataset of preferences to disk.
agent_path: if given, initialize the agent using this stored policy
rather than randomly.
num_agents: number of agents to train the reward model against.
preference_model_kwargs: passed to PreferenceModel
reward_trainer_kwargs: passed to BasicRewardTrainer or EnsembleRewardTrainer
gatherer_cls: type of PreferenceGatherer to use (defaults to SyntheticGatherer)
Expand Down Expand Up @@ -140,6 +143,9 @@ def train_preference_comparisons(
be allocated to each iteration. "hyperbolic" and "inverse_quadratic"
apportion fewer queries to later iterations when the policy is assumed
to be better and more stable.
share_training_steps_among_agents: If True (default), when training with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine, even though you could just do integer division of num_steps // num_agents manually as a user.

I might not have bothered to add this feature for two reasons: I don't see it's clearly more readable to pass this flag than it is to do the above, and I don't see why a user wishing to specify the total number of steps would want the total to be exactly num_steps instead of just a multiple of num_agents.

However, now that you've done it I don't oppose having it, as it doesn't preclude users from still splitting it manually. You might want to add input validation to make sure that each agent has at least one step of training inside MixtureOfTrajectoryGenerators._partition (e.g. raise if steps < n).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this the default behavior for the following reason. Often in RL research papers we want to have a specific budget of environment interactions. In this context, I felt having the default behavior respect that budget would make the most sense. However, I do agree, this could be handled at the level of the scripts and reduce the complexity of the business logic a bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thanks for your clarification. I just figured that for a case of e.g. 7 agents, 2113 steps instead of 2114 (a multiple of 7) would not really make a difference, but maybe it does to some users.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just figured that for a case of e.g. 7 agents, 2113 steps instead of 2114 (a multiple of 7) would not really make a difference, but maybe it does to some users.

I don't think this would make a difference to most users. I'm mainly concerned with what the default behavior. Furthermore, I believe it should be to split training steps equally amongst the agents. Thus, the num_steps records the actually budget of environment interactions across all agents.

We could add this in the script level. However, I suspect that down stream users of the core API will also just end up implementing something similar to num_steps//num_agents themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having default behavior preserve total training timesteps budget sems good to me. Integer division // vs something more clever that handles remainders seems unimportant -- we usually have hundreds of thousands of timesteps and single-digit numbers of agents.

num_agents > 1 training steps are split equally among the agents. If
False, all agents train for the full number of steps.

Returns:
Rollout statistics from trained policy.
Expand All @@ -155,31 +161,55 @@ def train_preference_comparisons(
reward_net.predict_processed,
update_stats=False,
)
if agent_path is None:
agent = rl_common.make_rl_algo(venv, relabel_reward_fn=relabel_reward_fn)
else:
agent = rl_common.load_rl_algo_from_path(
agent_path=agent_path,
venv=venv,
relabel_reward_fn=relabel_reward_fn,
)

if trajectory_path is None:
# Setting the logger here is not necessary (PreferenceComparisons takes care
# of it automatically) but it avoids creating unnecessary loggers.
trajectory_generator = preference_comparisons.AgentTrainer(
if num_agents < 1 or not isinstance(num_agents, int):
raise ValueError("num_agents must be a positive integer!")

def make_agent_trainer(seed: Optional[int] = None):
if agent_path is None:
agent = rl_common.make_rl_algo(
venv,
relabel_reward_fn=relabel_reward_fn,
)
else:
agent = rl_common.load_rl_algo_from_path(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There'll be hilariously little diversity between agents in this case, but not much we can do there. (Support loading different agents I guess? But that seems overkill for what's a rare use case.)

agent_path=agent_path,
venv=venv,
relabel_reward_fn=relabel_reward_fn,
)

# Setting the logger here is not really necessary (PreferenceComparisons
# takes care of that automatically) but it avoids creating unnecessary
# loggers
return preference_comparisons.AgentTrainer(
algorithm=agent,
reward_fn=reward_net,
venv=venv,
exploration_frac=exploration_frac,
seed=_seed,
seed=_seed if seed is None else seed,
custom_logger=custom_logger,
**trajectory_generator_kwargs,
)

if trajectory_path is None and num_agents == 1:
single_agent = True
trajectory_generator = make_agent_trainer()
# Stable Baselines will automatically occupy GPU 0 if it is available.
# Let's use the same device as the SB3 agent for the reward model.
reward_net = reward_net.to(trajectory_generator.algorithm.device)
elif trajectory_path is None and num_agents > 1:
single_agent = False
members = [make_agent_trainer(_seed + i) for i in range(num_agents)]
trajectory_generator = preference_comparisons.MixtureOfTrajectoryGenerators(
members=members,
custom_logger=custom_logger,
share_training_steps=share_training_steps_among_agents,
)
# Again using the same device as the SB3 agent
reward_net = reward_net.to(members[0].algorithm.device)
else:
single_agent = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not single agent in the sense that it's... zero agent? This is a bit counterintuitive.


if exploration_frac > 0:
raise ValueError(
"exploration_frac can't be set when a trajectory dataset is used",
Expand Down Expand Up @@ -251,7 +281,7 @@ def save_callback(iteration_num):
"checkpoints",
f"{iteration_num:04d}",
),
allow_save_policy=bool(trajectory_path is None),
allow_save_policy=single_agent,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't save checkpoints if there's multiple agents? That's a little sad.

)

results = main_trainer.train(
Expand All @@ -261,9 +291,9 @@ def save_callback(iteration_num):
)

# Storing and evaluating policy only useful if we generated trajectory data
if bool(trajectory_path is None):
if trajectory_path is None and single_agent:
results = dict(results)
results["rollout"] = train.eval_policy(agent, venv)
results["rollout"] = train.eval_policy(trajectory_generator.algorithm, venv)

if save_preferences:
main_trainer.dataset.save(os.path.join(log_dir, "preferences.pkl"))
Expand All @@ -273,7 +303,7 @@ def save_callback(iteration_num):
save_checkpoint(
trainer=main_trainer,
save_path=os.path.join(log_dir, "checkpoints", "final"),
allow_save_policy=bool(trajectory_path is None),
allow_save_policy=single_agent,
)

return results
Expand Down
Loading