Source code for rbgame.trainer

from __future__ import annotations
from typing import Any, Callable
from collections import Counter
import time
import os

import numpy as np
import matplotlib.pyplot as plt
from tianshou.data import Batch
from tianshou.env import DummyVectorEnv
# from tianshou.utils.torch_utils import (
#     policy_within_training_step, 
#     torch_train_mode,
#     )

from rbgame.agent.rl_agent import RLAgent
from rbgame.game.game import RoboticBoardGame

[docs] class DecentralizedTrainer: """ A decentralized trainer. :param env_args: Arguments for enviroment. :param num_train_envs: Number enviroments for :class:`DummyVectorEnv <tianshou.env.venvs.DummyVectorEnv>` used in train phase. :param num_test_envs: Number enviroments for :class:`DummyVectorEnv <tianshou.env.venvs.DummyVectorEnv>` used in test phase. :param batch_size: Batch size. :param update_freq: After how many steps do a policy update, now only update by steps. :param test_freq: After how many episodes do a test. :param episodes_per_train: Total number episodes of training. :param episodes_per_test: Total number episodes in a test. :param train_fn: A hook called after each :code:`num_train_env` episodes during training. It can be used to perform custom additional operations, with the signature :code:`f(num_collected_episodes: int, num_collected_steps: int) -> None`. :param test_fn: A hook called after each :code:`num_test_env` episodes during testing. It can be used to perform custom additional operations, with the signature :code:`f(num_collected_episodes: int, num_collected_steps: int) -> None`. :param save_best_fn: A hook called when the reward metric get better during training. with the signature :code:`f(episode_to_call: int) -> None`. :param save_last_fn: A hook called when training has finished. :param stop_fn: A hook called after each :code:`num_train_env` episodes during training with the signature :code:`f(reward_to_stop: int, episode_to_stop: int) -> bool`. :param reward_metric: A function with signature :code:`f(rewards: np.ndarray with shape (num_episode, agent_num)) -> a scalar np.ndarray`. We need to return a single scalar to monitor training. This function specifies what is the desired metric, e.g., the reward of agent 1 or the average reward over all agents. """ def __init__( self, env_args: dict[str, Any], num_train_envs: int = 16, num_test_envs: int = 16, batch_size: int = 64, update_freq: int = 100, test_freq: int = 100, episodes_per_train: int = 5000, episodes_per_test: int = 50, train_fn: Callable[[int, int], None]|None = None, test_fn: Callable[[int, int], None]|None = None, save_best_fn: Callable[[int], None]|None = None, save_last_fn: Callable[[], None]|None = None, stop_fn: Callable[[float, int], bool]|None = None, reward_metric: Callable[[np.ndarray], float]|None = None, # agent stores in memory transitions of the other or not shared_memory: bool = True, ) -> None: env_args.update({ 'render_mode': None, 'log_to_file': False, }) def make_env(): return RoboticBoardGame(**env_args) self.train_env = DummyVectorEnv([make_env for _ in range(num_train_envs)]) self.test_env = DummyVectorEnv([make_env for _ in range(num_test_envs)]) self.batch_size = batch_size self.update_freq = update_freq self.test_freq = test_freq self.episodes_per_train = episodes_per_train self.episodes_per_test = episodes_per_test self.train_fn = train_fn self.test_fn = test_fn self.save_best_fn = save_best_fn self.save_last_fn = save_last_fn self.stop_fn = stop_fn if stop_fn else \ lambda reward, episode: False self.reward_metric = reward_metric if reward_metric else \ lambda rewards: rewards.mean() self.shared_memory = shared_memory self.num_agents = self.train_env.get_env_attr('num_agents')[0] self.agent_names = self.train_env.get_env_attr('agents')[0]
[docs] def train( self, agents: list[RLAgent], learning_mask: list|np.ndarray, exploration_mask: list|np.ndarray|None=None, plot: bool=True, ) -> dict[str, Any]: """ Agents play together to learn. :param agents: :py:class:`list` of agents, which participate in game. :param learning_mask: A binary vector to define which agent need to learn. :param exploration_mask: A binary vector to define how agent behaves within training. Whether explore or not for off-policy agent and whether random sample or get mode for on-policy agent. Default to :py:data:`None`, which mean all agents explore during training. :param plot: Plot a graph of metric evolulation and save it. :return: Training statistic. """ # E - number of enviroments # B - collected batch size # R - number of running envs # O - observation-vector size assert any(learning_mask), 'We need at least one learning agent.' assert self.num_agents == len(agents), f'Please provide number of agents is {self.num_agents}' assert self.num_agents == len(learning_mask), f'Please provide learning_mask size is {self.num_agents}' if exploration_mask is None: exploration_mask = np.ones_like(learning_mask, dtype=np.bool_) assert self.num_agents == len(exploration_mask), f'Please provide exploration_mask size is {self.num_agents}' for agent_index, agent in enumerate(agents): if learning_mask[agent_index]: assert agent.memory is not None, f'Learning agent {agent_index} must having a memory.' assert self.num_agents*self.train_env.env_num == agent.memory.buffer_num agents: dict[str, RLAgent] = {k: v for k, v in zip(self.agent_names, agents)} num_envs = self.train_env.env_num num_collected_steps = 0 num_collected_episodes = 0 num_gradient_steps = 0 last_num_collected_steps = num_collected_steps last_num_collected_episodes = num_collected_episodes # lists to record data for plotting episodes = [] rewards = [] start = time.time() while num_collected_episodes < self.episodes_per_train: done_e = np.zeros(num_envs, dtype=np.bool_) obs_e, _ = self.train_env.reset() # train function do some stuffs at beginning of every episode if self.train_fn: self.train_fn(num_collected_episodes, num_collected_steps) while not all(done_e): # ids and current observations of running envs ids_r = np.where(done_e == False)[0] obs_r = obs_e[ids_r] # current agents of these envs current_agents_r = np.array(self.train_env.get_env_attr('agent_selection', id = ids_r)) for agent_index, (name, agent) in enumerate(agents.items()): # indicies and of envs within running envs that have current agent is ```agent``` # we call such env is right env env_inner_indicies_b = np.where(current_agents_r == name)[0] ids_b = ids_r[env_inner_indicies_b] obs_b = obs_r[env_inner_indicies_b] if len(env_inner_indicies_b) == 0: # skip if no env is in ```agent```'s turn continue # policy generate action from right envs obs_b_o = np.array([obs['observation'] for obs in obs_b]) action_mask_b = np.array([obs['action_mask'] for obs in obs_b]) if exploration_mask[agent_index]: # with policy_within_training_step(agent.policy): agent.policy.train() act_b = agent.infer_act(obs_b_o, action_mask_b, exploration_noise=True) agent.policy.eval() else: act_b = agent.infer_act(obs_b_o, action_mask_b, exploration_noise=False) # step in the right envs next_obs_b, rew_b, terminated_b, truncated_b, info_b = self.train_env.step(act_b, ids_b) next_obs_b_o = np.array([obs['observation'] for obs in next_obs_b]) # add transitions to memories of all learning agents, only shared memory now for a_i, a in enumerate(agents.values()): if learning_mask[a_i]: # mofify or reset data in one memory doesn't change data in the other # so we don't need to create copy of data to store in next buffer a.memory.add( Batch( obs=obs_b_o, act=act_b, rew=rew_b, terminated=terminated_b, truncated=truncated_b, obs_next=next_obs_b_o, info=info_b, ), buffer_ids=ids_b*self.num_agents+agent_index, ) num_collected_steps += ids_r.size # policies updating if ((num_collected_steps-last_num_collected_steps) >= self.update_freq): for agent_index, agent in enumerate(agents.values()): if learning_mask[agent_index]: # with policy_within_training_step(agent.policy), torch_train_mode(agent.policy): agent.policy.train() num_bonus_steps = num_collected_steps-last_num_collected_steps num_gradient_steps += agent.policy_update_fn(self.batch_size, num_bonus_steps) agent.policy.eval() last_num_collected_steps=num_collected_steps # observe new observations and dones of all envs last_e = [last() for last in self.train_env.get_env_attr('last')] done_e = np.array([last[2] or last[3] for last in last_e]) obs_e = np.array([last[0] for last in last_e]) num_collected_episodes += num_envs # test if (num_collected_episodes-last_num_collected_episodes) >= self.test_freq: test_stats = self.test(agents.values(), eval_metrics=False) num_steps, reward_metric = test_stats['mean_num_steps'], test_stats['reward'] if len(rewards) > 0 and reward_metric > rewards[-1] and self.save_best_fn: self.save_best_fn(num_collected_episodes) episodes.append(num_collected_episodes) rewards.append(reward_metric) print("===episode {:04d} done with number steps: {:5.1f}, reward: {:+06.2f}===" .format((num_collected_episodes), num_steps, reward_metric)) last_num_collected_episodes = num_collected_episodes # break if reach required reward if self.stop_fn(rewards[-1], num_collected_episodes): break finish = time.time() if self.save_last_fn: self.save_last_fn() if plot: self.plot_stats(episodes, rewards) return { 'reward_metric_stats': rewards, 'num_collected_steps': num_collected_steps, 'num_collected_episodes': num_collected_episodes, 'num_gradient_steps': num_gradient_steps, 'training_time': finish - start, }
@staticmethod def plot_stats(episodes: list[int], rewards: list[float]) -> None: fig, axes = plt.subplots(1, 1, figsize=(6, 4)) axes.plot(np.array(episodes), np.array(rewards), marker='.', color='b', label='reward') axes.set_xlabel("Number of collected episodes") axes.set_ylabel("Reward metric") axes.set_title("Performance of agent through episode.") axes.legend() axes.grid() fig.tight_layout() os.makedirs(os.path.join(os.getcwd(), 'plots'), exist_ok=True) fig.savefig(os.path.join(os.getcwd(), 'plots', 'results.png'), dpi=150, bbox_inches="tight")
[docs] def test( self, agents: list[RLAgent], eval_metrics: bool = False, ) -> dict[str, Any]: """ Test trained agents. :param agents: :py:class:`list` of agents, which participate in game. :param eval_metrics: Evaluate some addition metric of game process. :return: Testing statistic. """ # P - number of episodes # E - number of enviroments # B - collected batch size # R - number of running envs # O - observation-vector size # A - number of agents assert self.num_agents == len(agents), f'Please provide number of agents is {self.num_agents}' agents: dict[str, RLAgent] = {k: v for k, v in zip(self.agent_names, agents)} num_envs = self.test_env.env_num num_collected_steps = 0 num_collected_episodes = 0 num_finished_episodes = 0 rewards_p_a = np.array([]).reshape(0, self.num_agents) if eval_metrics: time_spans = 0 count_wins = Counter() while num_collected_episodes < self.episodes_per_test: rewards_e_a = np.zeros((num_envs, self.num_agents)) done_e = np.zeros(num_envs, dtype=np.bool_) obs_e, _ = self.test_env.reset() if self.test_fn: self.test_fn(num_collected_episodes, num_collected_steps) while not all(done_e): # ids and current observations of running envs ids_r = np.where(done_e == False)[0] obs_r = obs_e[ids_r] # current agents of these envs current_agents_r = np.array(self.test_env.get_env_attr('agent_selection', id = ids_r)) for agent_index, (name, agent) in enumerate(agents.items()): # indicies of envs within running envs that have current agent is ```agent``` # we call such env is right env env_inner_indicies_b = np.where(current_agents_r == name)[0] ids_b = ids_r[env_inner_indicies_b] obs_b = obs_r[env_inner_indicies_b] if len(env_inner_indicies_b) == 0: # skip if no env is in ```agent```'s turn continue # policy generate action from right envs obs_b_o = np.array([obs['observation'] for obs in obs_b]) action_mask_b = np.array([obs['action_mask'] for obs in obs_b]) act_b = agent.infer_act(obs_b_o, action_mask_b, exploration_noise=False) # step in the right envs _, rew_b, _, _, _ = self.test_env.step(act_b, ids_b) rewards_e_a[ids_b, agent_index] += rew_b num_collected_steps += ids_r.size # observe new observations and dones of all envs last_e = [last() for last in self.test_env.get_env_attr('last')] done_e = np.array([last[2] or last[3] for last in last_e]) obs_e = np.array([last[0] for last in last_e]) num_collected_episodes += num_envs rewards_p_a = np.concatenate((rewards_p_a, rewards_e_a), axis=0) if eval_metrics: winners_e = np.array(self.test_env.get_env_attr('winner')) id_finished_envs = np.where(winners_e != None)[0] time_spans += sum([clock.now for clock in self.test_env.get_env_attr('game_clock', id=id_finished_envs)]) num_finished_episodes += id_finished_envs.size count_wins.update(winners_e) reward = self.reward_metric(rewards_p_a) test_stats = { 'reward': reward, 'mean_num_steps': num_collected_steps/num_collected_episodes, 'num_collected_steps': num_collected_steps, 'num_collected_episodes': num_collected_episodes, } if eval_metrics: test_stats.update({'time_spans': time_spans/num_finished_episodes, 'count_wins': dict(count_wins)}) return test_stats