import random
import os
import logging as log
from typing import Any
import pygame
import numpy as np
import pettingzoo
import gymnasium
from gymnasium import spaces
from pettingzoo import utils
from rbgame.game import components
from rbgame.game.consts import *
from rbgame.agent.base_agent import BaseAgent
pygame.init()
[docs]
class RoboticBoardGame(gymnasium.Env, pettingzoo.AECEnv):
"""
Main class representing the game. The game can be configured with difference parameters.
:param colors_map: Color map for board.
:param target_map: Target map for board.
:param required_mail: Number of mails to win.
:param robot_colors: Colors of robots.
:param num_robots_per_player: Number robots per player.
:param with_battery: Battery is considered or not.
:param random_num_steps: Robot can move random number of steps each turn or not.
:param max_step: Maximum enviroment step.
:param render_mode: The render mode. It can be :py:data:`None` or :code:`'human'`.
:param log_to_file: Log game process to file or not.
"""
metadata = {"render_modes": ["human"], "name": "robotic_board_game", "is_parallelizable": False, "render_fps": 20}
def __init__(
self,
colors_map: str,
targets_map: str,
required_mail: int,
robot_colors: list[str],
num_robots_per_player: int = 1,
with_battery: bool = False,
random_num_steps = False,
max_step: int = 500,
render_mode: str|None = None,
log_to_file: bool = False,
) -> None:
super().__init__()
assert len(robot_colors) >= 2
self.game_clock = components.Clock()
self.robot_sprites: pygame.sprite.Group = pygame.sprite.Group()
self.mail_sprites: pygame.sprite.Group = pygame.sprite.Group()
self.board = components.Board(colors_map=colors_map, targets_map=targets_map)
self.required_mail = required_mail
self.robot_colors = robot_colors
self.max_step = max_step
self.num_robots_per_player = num_robots_per_player
self.num_robots = num_robots_per_player * len(robot_colors)
self.__with_battery = with_battery
self.random_num_steps = random_num_steps
self.steps_to_change_turn = random.choice(range(1, MAXIMUM_STEP_PER_TURN)) if self.random_num_steps else 1
robot_cells_init = random.sample(self.board.white_cells,
k=self.num_robots)
robots: list[components.Robot] = [
components.Robot(
robot_cells_init[num_robots_per_player * j + i],
i + 1,
robot_color,
self.mail_sprites,
self.game_clock,
with_battery=self.__with_battery,
render_mode=render_mode,
log_to_file=log_to_file
)
for j, robot_color in enumerate(robot_colors)
for i in range(num_robots_per_player)
]
self.robots: dict[str, components.Robot] = {robot.color + str(robot.index):robot for robot in robots}
# generate new mail in green cells
for green_cell in self.board.green_cells:
green_cell.generate_mail(self.mail_sprites, render_mode)
# add all robots to sprites group
self.robot_sprites.add([robot for robot in self.robots.values()])
# TODO: separate player and robot index in tuple for centralized training
self.agents = [robot_name for robot_name in self.robots.keys()]
self.possible_agents = self.agents[:]
self.action_spaces: dict[str, spaces.Discrete] = {a: spaces.Discrete(5) for a in self.agents}
robot_obs_size = 4 if self.__with_battery else 3
self.observation_spaces: dict[str, spaces.Dict]= {
a: spaces.Dict(
{
"observation": spaces.Box(
low=0, high=1, shape=(robot_obs_size*self.num_robots,), dtype=np.float32
),
"action_mask": spaces.Box(
low=0, high=1, shape=(self.action_spaces[a].n,), dtype=np.uint8
),
}
)
for a in self.agents
}
self.rewards = {a: 0 for a in self.agents}
self._cumulative_rewards = {agent: 0 for agent in self.agents}
self.terminations = {a: False for a in self.agents}
self.truncations = {a: False for a in self.agents}
self.infos = {a: {} for a in self.agents}
self._agent_selector = utils.agent_selector(self.agents)
self.agent_selection = self._agent_selector.reset()
self.num_steps = 0
self.winner = None
assert render_mode is None or render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
self.log = log_to_file
self.screen = None
if self.render_mode == "human":
# draw a background
self.background = pygame.Surface((15*CELL_SIZE[0], 11*CELL_SIZE[1]))
self.background.fill((255, 255, 255))
# draw board
for i in range(self.board.size):
for j in range(self.board.size):
self.board[i, j].draw(self.background)
# draw axes
images_for_cell_coordinate = [
pygame.font.SysFont(None, 48).render(str(i), True, (0, 0, 0))
for i in range(9)
]
for i in range(self.board.size):
self.background.blit(
images_for_cell_coordinate[i],
((i + 1) * CELL_SIZE[0] +
(CELL_SIZE[0] - images_for_cell_coordinate[i].get_width()) / 2,
(CELL_SIZE[1] - images_for_cell_coordinate[i].get_height()) / 2))
self.background.blit(
images_for_cell_coordinate[i],
((CELL_SIZE[0] - images_for_cell_coordinate[i].get_width()) / 2,
(i + 1) * CELL_SIZE[1] +
(CELL_SIZE[1] - images_for_cell_coordinate[i].get_height()) / 2))
# TODO: display battery bar under robot image if agent observe pixels
# draw baterry side identification for each robot
images_for_baterry_bar = [
pygame.font.SysFont(None, 24).render(str(i+1), True, (0, 0, 0))
for i in range(self.num_robots_per_player)]
for i, robot in enumerate(self.robots.values()):
rect = pygame.Rect(
10*CELL_SIZE[0] + (5*CELL_SIZE[0] - (MAXIMUM_ROBOT_BATTERY+2) * CELL_BATTERY_SIZE[0])/2,
5*CELL_SIZE[1] + i * CELL_BATTERY_SIZE[1],
CELL_BATTERY_SIZE[0],
CELL_BATTERY_SIZE[1])
pygame.draw.circle(self.background, ROBOT_COLORS[robot.color], rect.center, CELL_BATTERY_SIZE[0] / 2 * 0.8, 0)
pygame.draw.rect(self.background, (0,0,0), rect, 1)
self.background.blit(
images_for_baterry_bar[robot.index - 1],
(rect.left +
(CELL_BATTERY_SIZE[0] - images_for_baterry_bar[robot.index - 1].get_width()) / 2,
rect.top +
(CELL_BATTERY_SIZE[1] - images_for_baterry_bar[robot.index - 1].get_height()) / 2))
# draw baterry bar
for j in range(self.num_robots):
for i in range(MAXIMUM_ROBOT_BATTERY + 1):
rect = pygame.Rect(
10*CELL_SIZE[0] + (5*CELL_SIZE[0] - (MAXIMUM_ROBOT_BATTERY+2)*CELL_BATTERY_SIZE[0])/2 + (i+1) * CELL_BATTERY_SIZE[0],
5*CELL_SIZE[1] + j * CELL_BATTERY_SIZE[1],
CELL_BATTERY_SIZE[0],
CELL_BATTERY_SIZE[1])
pygame.draw.rect(self.background, (0, 0, 0), rect, 1)
# draw progress bar which show count of collected mails
parent_dir = os.path.dirname(os.path.dirname(__file__))
bar_background = pygame.image.load(os.path.join(parent_dir, 'assets', 'images', 'loading_bar_background.png'))
bar_background = pygame.transform.scale(bar_background, (3*CELL_SIZE[0], CELL_SIZE[0]/2))
bar_rect = bar_background.get_rect()
for i,_ in enumerate(self.robot_colors):
self.background.blit(bar_background, (10*CELL_SIZE[0]+(5*CELL_SIZE[0]-bar_rect.width)/2, CELL_SIZE[1]+i*1.5*bar_rect.height))
# clock to tuning fps
self.clock = pygame.time.Clock()
@property
def with_battery(self) -> bool:
return self.__with_battery
@with_battery.setter
def with_battery(self, with_battery: bool) -> None:
self.__with_battery = with_battery
for robot in self.robots.values():
robot.with_battery = self.__with_battery
robot_obs_size = 4 if self.__with_battery else 3
self.observation_spaces: dict[str, spaces.Dict]= {
a: spaces.Dict(
{
"observation": spaces.Box(
low=0, high=1, shape=(robot_obs_size*self.num_robots,), dtype=np.float32
),
"action_mask": spaces.Box(
low=0, high=1, shape=(self.action_spaces[a].n,), dtype=np.uint8
),
}
)
for a in self.agents
}
[docs]
def sum_count_mail(self, color: str) -> int:
"""
:param color: Color of player.
:return: Sum collected mails of one player.
"""
return sum([robot.count_mail for robot in self.robots.values() if robot.color == color])
[docs]
def observe(self, agent: str) -> dict[str, np.ndarray]:
"""
:param agent: Agent that need to observe.
:return: Observation of this agent.
Is is a :py:class:`dict` with two key: :code:`'observation'` and :code:`'action_mask'`.
Value of :code:`'observation'` key is the :py:attr:`observation <rbgame.game.components.Robot.observation>`
vectors of all robots concatenated. :py:attr:`Observation <rbgame.game.components.Robot.observation>` of robot
that is controlled by :code:`agent` is placed in the first place.
Value of :code:`'action_mask'` key is a binary vector where each element
of the vector represents whether the action is legal or not.
"""
robot_states = np.hstack([self.robots[a].observation for a in self.agents if a != agent])
robot_states = np.hstack([self.robots[agent].observation, robot_states])
mask = self.robots[agent].mask
return {'observation': robot_states, 'action_mask': mask}
[docs]
def observation_space(self, agent: str) -> spaces.Dict:
"""
:param agent: Agent that need to get observation space.
:return: Observation space of :code:`agent`.
"""
return self.observation_spaces[agent]
[docs]
def action_space(self, agent: str) -> spaces.Discrete:
"""
:param agent: Agent that need to get action space.
:return: Action space of :code:`agent`.
"""
return self.action_spaces[agent]
[docs]
def reset(self, seed: int|None = None, options: Any|None=None) -> tuple[dict[str, np.ndarray], dict[str, Any]]:
"""
Reset enviroment.
:param seed: Random module seed. If it isn't :py:data:`None`, reset
enviroment to same initial state every time.
:param option: Unused.
:return: Observation of current agent and some infomations.
"""
random.seed(seed)
self.agents = self.possible_agents[:]
self.rewards = {agent: 0 for agent in self.agents}
self._cumulative_rewards = {agent: 0 for agent in self.agents}
self.terminations = {agent: False for agent in self.agents}
self.truncations = {agent: False for agent in self.agents}
self.infos = {agent: {} for agent in self.agents}
self.game_clock.reset()
self.board.reset()
robot_cells_init = random.sample(self.board.white_cells,
k=self.num_robots)
for i, robot in enumerate(self.robots.values()):
robot.reset(robot_cells_init[i])
self.mail_sprites.empty()
for green_cell in self.board.green_cells:
green_cell.generate_mail(self.mail_sprites, self.render_mode)
self.steps_to_change_turn = random.choice(range(1, MAXIMUM_STEP_PER_TURN)) if self.random_num_steps else 1
self._agent_selector.reinit(self.agents)
self.agent_selection = self._agent_selector.reset()
self.num_steps = 0
self.winner = None
if self.render_mode == "human":
self.render()
return self.observe(self.agent_selection), self.infos[self.agent_selection]
[docs]
def step(self, action: int|None) -> tuple[dict[str, np.ndarray], float, bool, bool, dict[str, Any]]:
"""
Perform enviroment step with input :code:`action`.
:param action: Action from agent.
:return: Next observation of acting agent, the reward, termination, truncation and infomations.
Flag termination - enviroment has finished?,
flag truncation - enviroment reaches maximum step and has finished?
"""
if (
self.terminations[self.agent_selection]
or self.truncations[self.agent_selection]
):
return self._was_dead_step(action)
# TODO: is this caculation worth keeping? we can simply return reward
self._cumulative_rewards = {agent: 0 for agent in self.agents}
self.rewards = {agent: 0 for agent in self.agents}
# #r(s, a, s') and s'(s, a)
acting_robot = self.robots[self.agent_selection]
is_moved, reward = acting_robot.step(action)
self.rewards[self.agent_selection] = reward
self._accumulate_rewards()
# if robot has moved, charge robots in blue cells
# don't charge acting robot, it decides this itself in step method
if is_moved and self.with_battery:
for blue_cell in self.board.blue_cells:
if blue_cell.robot and blue_cell.robot is not acting_robot:
blue_cell.robot.charge()
self.num_steps += 1
if self.sum_count_mail(acting_robot.color) == self.required_mail:
self.terminations = {a: True for a in self.agents}
self.winner = acting_robot.color
if self.log:
log.info(f'At t={self.game_clock.now:04} Player {self.winner} win')
self.truncations = {a: self.num_steps >= self.max_step for a in self.agents}
if self.render_mode == "human":
# for smooth movement
for i in range(1, FRAME_PER_STEP+1):
diff = tuple(a-b for a, b in zip(acting_robot.next_rect.topleft, acting_robot.rect.topleft))
acting_robot.rect.topleft = tuple(a+i/FRAME_PER_STEP*b for a,b in zip(acting_robot.rect.topleft, diff))
if acting_robot.mail:
acting_robot.mail.rect.topleft = acting_robot.rect.topleft
self.render()
self.steps_to_change_turn -= 1
if self.steps_to_change_turn == 0:
self.agent_selection = self._agent_selector.next()
if self.render_mode == "human":
self.render()
self.steps_to_change_turn = random.choice(range(1, MAXIMUM_STEP_PER_TURN)) if self.random_num_steps else 1
# return previous agent's observation as next observation if game changes turn
return (
self.observe(self.previous_agent),
self._cumulative_rewards[self.previous_agent],
self.terminations[self.agent_selection],
self.truncations[self.agent_selection],
{'transition_belongs_agent': self.agents.index(self.previous_agent)},
)
# return current agent's observation as next observation if game doesn't changes turn
return (
self.observe(self.agent_selection),
self._cumulative_rewards[self.agent_selection],
self.terminations[self.agent_selection],
self.truncations[self.agent_selection],
{'transition_belongs_agent': self.agents.index(self.agent_selection)},
)
@property
def previous_agent(self):
"""
Previous agent.
"""
index = self.agents.index(self.agent_selection)
if index == 0:
return self.agents[-1]
return self.agents[index-1]
[docs]
def render(self) -> None:
"""
Display all animations to screen. Only works if enviroment render mode is :code:`'human'`.
"""
if self.render_mode is None:
gymnasium.logger.warn(
"You are calling render method without specifying any render mode."
)
elif self.render_mode == "human":
self._render_gui()
else:
raise ValueError(
f"{self.render_mode} is not a valid render mode. Available modes are: {self.metadata['render_modes']}"
)
def _render_gui(self) -> None:
if self.screen is None:
self.screen = pygame.display.set_mode(
self.background.get_size())
pygame.display.set_caption('Robotics Board Game')
self.screen.blit(self.background, (0, 0))
self.robot_sprites.draw(self.screen)
self.mail_sprites.draw(self.screen)
for i, robot in enumerate(self.robots.values()):
pygame.draw.circle(
self.screen, ROBOT_COLORS[robot.color],
(10*CELL_SIZE[0] + (5*CELL_SIZE[0] - (MAXIMUM_ROBOT_BATTERY+2)*CELL_BATTERY_SIZE[0])/2 + (robot.battery + 1.5) * CELL_BATTERY_SIZE[0],
5*CELL_SIZE[1] + (i + 0.5) * CELL_BATTERY_SIZE[1]),
CELL_BATTERY_SIZE[0] / 2 * 0.8, 0)
acting_robot = self.robots[self.agent_selection]
pygame.draw.rect(self.screen, ROBOT_COLORS[acting_robot.color], acting_robot.rect, 3)
for i,color in enumerate(self.robot_colors):
pygame.draw.rect(self.screen, ROBOT_COLORS[color], (11*CELL_SIZE[0]+3, CELL_SIZE[1]+i*1.5*CELL_SIZE[1]/2+3, \
(3*CELL_SIZE[0]-6)*self.sum_count_mail(color)/self.required_mail, CELL_SIZE[1]/2-6))
self.clock.tick(self.metadata["render_fps"])
pygame.display.update()
[docs]
def close(self) -> None:
"""
Close the enviroment.
"""
pass
def watch(self) -> None:
running = True
self.render()
while running :
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
[docs]
def run(self, agents: list[BaseAgent]) -> tuple[str | None, int]:
"""
Animate game process between agents. User can control robots by keyboard.
:param agents: Agents to act. If it's :py:data:`None`, action is provided from keyboard.
:return: Game time and the winner.
"""
assert len(agents) == len(self.agents)
self.reset()
if self.log:
log.info(f'At t={self.game_clock.now:04} game starts with {self.num_robots_per_player} number robots per player and {len(self.robot_colors)} players')
for robot in self.robots.values():
log.info(f'At t={self.game_clock.now:04} {COLOR2STR[robot.color]:>5} robot {robot.index} in position [{robot.pos.x},{robot.pos.y}]')
if any(agent is None for agent in agents) and self.render_mode is None:
raise ValueError("Person-player can't play without rendering animation")
agents: dict[str, BaseAgent] = {name: a for name, a in zip(self.agents, agents)}
running = True
while running and not self.terminations[self.agent_selection] and not self.truncations[self.agent_selection]:
if agents[self.agent_selection] is not None:
obs = self.observe(self.agent_selection)
action = agents[self.agent_selection].get_action(obs)
self.step(action)
# Human behaviors
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_r:
self.reset()
if event.key == pygame.K_SPACE:
mask = self.robots[self.agent_selection].mask
if not any(mask) or mask[0]:
self.step(components.Action.DO_NOTHING)
if event.key == pygame.K_UP:
mask = self.robots[self.agent_selection].mask
if not any(mask) or mask[1]:
self.step(components.Action.GO_AHEAD)
if event.key == pygame.K_DOWN:
mask = self.robots[self.agent_selection].mask
if not any(mask) or mask[2]:
self.step(components.Action.GO_BACK)
if event.key == pygame.K_LEFT:
mask = self.robots[self.agent_selection].mask
if not any(mask) or mask[3]:
self.step(components.Action.TURN_LEFT)
if event.key == pygame.K_RIGHT:
mask = self.robots[self.agent_selection].mask
if not any(mask) or mask[4]:
self.step(components.Action.TURN_RIGHT)
return self.winner, self.game_clock.now