diff options
Diffstat (limited to 'diplomacy/server/server_game.py')
-rw-r--r-- | diplomacy/server/server_game.py | 465 |
1 files changed, 465 insertions, 0 deletions
diff --git a/diplomacy/server/server_game.py b/diplomacy/server/server_game.py new file mode 100644 index 0000000..6ea349e --- /dev/null +++ b/diplomacy/server/server_game.py @@ -0,0 +1,465 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette, Steven Bocco +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +""" Server game class. """ +from diplomacy.engine.game import Game +from diplomacy.engine.message import GLOBAL, Message, OBSERVER, OMNISCIENT, SYSTEM +from diplomacy.engine.power import Power +from diplomacy.utils import exceptions, parsing, strings +from diplomacy.utils.game_phase_data import GamePhaseData + +class ServerGame(Game): + """ ServerGame class. Properties: + - omniscient_usernames (only for server games): + set of usernames allowed to be omniscient observers for this game. + - moderator_usernames (only for server games): + set of usernames allowed to be moderators for this game. + - observer (only for server games): + special Power object (diplomacy.Power) used to manage observer tokens. + - omniscient (only for server games): + special Power object (diplomacy.Power) used to manage omniscient tokens. + """ + __slots__ = ['server', 'omniscient_usernames', 'moderator_usernames', 'observer', 'omniscient'] + model = parsing.update_model(Game.model, { + strings.MODERATOR_USERNAMES: parsing.DefaultValueType(parsing.SequenceType(str, sequence_builder=set), ()), + strings.OBSERVER: parsing.OptionalValueType(parsing.JsonableClassType(Power)), + strings.OMNISCIENT: parsing.OptionalValueType(parsing.JsonableClassType(Power)), + strings.OMNISCIENT_USERNAMES: parsing.DefaultValueType(parsing.SequenceType(str, sequence_builder=set), ()), + }) + + def __init__(self, **kwargs): + # Reference to a Server instance. + self.server = None # type: diplomacy.Server + self.omniscient_usernames = None # type: set + self.moderator_usernames = None # type: set + self.observer = None # type: Power + self.omniscient = None # type: Power + + super(ServerGame, self).__init__(**kwargs) + assert self.is_server_game() + + # Initialize special powers. + self.observer = self.observer or Power(self, name=strings.OBSERVER_TYPE) + self.omniscient = self.omniscient or Power(self, name=strings.OMNISCIENT_TYPE) + self.observer.set_controlled(strings.OBSERVER_TYPE) + self.omniscient.set_controlled(strings.OBSERVER_TYPE) + + # Server-only methods. + + def get_related_power_names(self, power_name): + """ Return list of power names controlled by the controlled of given power name. """ + related_power_names = [] + if self.has_power(power_name): + related_power_names = [power_name] + related_power = self.get_power(power_name) + if related_power.is_controlled(): + related_power_names = self.get_controlled_power_names(related_power.get_controller()) + return related_power_names + + def filter_phase_data(self, phase_data, role, is_current): + """ Return a filtered version of given phase data for given gam role. + :param phase_data: GamePhaseData object to filter. + :param role: game role to filter phase data for. + :param is_current: Boolean. Indicate if given phase data is for a current phase (True), or for a pase phase. + :return: a new GamePhaseData object suitable for given game role. + :type phase_data: GamePhaseData + """ + if role == strings.OMNISCIENT_TYPE: + # Nothing to filter. + return phase_data + if role == strings.OBSERVER_TYPE: + # Filter messages. + return GamePhaseData(name=phase_data.name, + state=phase_data.state, + orders=phase_data.orders, + results=phase_data.results, + messages=self.filter_messages(phase_data.messages, role)) + # Filter for power roles. + related_power_names = self.get_related_power_names(role) + # Filter messages. + messages = self.filter_messages(phase_data.messages, related_power_names) + # We filter orders only if phase data is for a current phase. + if is_current: + orders = {power_name: phase_data.orders[power_name] + for power_name in related_power_names + if power_name in phase_data.orders} + else: + orders = phase_data.orders + # results don't need to be filtered: it should be provided empty for current phase, + # and it should be kept for a past phase/ + return GamePhaseData(name=phase_data.name, + state=phase_data.state, + orders=orders, + messages=messages, + results=phase_data.results) + + def game_can_start(self): + """ Return True if server game can start. A game can start if all followings conditions are satisfied: + - Game has not yet started. + - Game can start automatically (no rule START_MASTER). + - Game has expected number of controlled powers. + :return: a boolean + :rtype: bool + """ + return self.is_game_forming and not self.start_master and self.has_expected_controls_count() + + def get_messages(self, game_role, timestamp_from=None, timestamp_to=None): + """ Return a filtered dict of current messages for given output game role. + See method filter_messages() about parameters. + """ + return self.filter_messages(self.messages, game_role, timestamp_from, timestamp_to) + + def get_message_history(self, game_role): + """ Return a filtered dict of whole message history for given game role. """ + return {str(short_phase): self.filter_messages(messages, game_role) + for short_phase, messages in self.message_history.items()} + + def get_user_power_names(self, username): + """ Return list of power names controlled by given user name. """ + return [power.name for power in self.powers.values() if power.is_controlled_by(username)] + + def new_system_message(self, recipient, body): + """ Create a system message (immediately dated) to be sent by server and add it to message history. + To be used only by server game. + :param recipient: recipient description (string). Either: + - a power name. + - 'GLOBAL' (all game tokens) + - 'OBSERVER' (all special tokens [observers and omniscient observers]) + - 'OMNISCIENT' (all omniscient tokens only) + :param body: message body (string). + :return: a new GameMessage object. + :rtype: Message + """ + assert (recipient in {GLOBAL, OBSERVER, OMNISCIENT} + or self.has_power(recipient)) + message = Message(phase=self.current_short_phase, sender=SYSTEM, recipient=recipient, message=body) + # Message timestamp will be generated when adding message. + self.add_message(message) + return message + + def as_power_game(self, power_name): + """ Return a player game data object copy of this game for given power name. """ + for_username = self.get_power(power_name).get_controller() + game = Game.from_dict(self.to_dict()) + game.controlled_powers = self.get_controlled_power_names(for_username) + game.error = [] + game.message_history = self.get_message_history(power_name) + game.messages = self.get_messages(power_name) + game.observer_level = self.get_observer_level(for_username) + game.phase_abbr = game.current_short_phase + related_power_names = self.get_related_power_names(power_name) + for power in game.powers.values(): # type: Power + power.role = power.name + power.tokens.clear() + if power.name not in related_power_names: + power.vote = strings.NEUTRAL + power.orders.clear() + game.role = power_name + return game + + def as_omniscient_game(self, for_username): + """ Return an omniscient game data object copy of this game. """ + game = Game.from_dict(self.to_dict()) + game.controlled_powers = self.get_controlled_power_names(for_username) + game.message_history = self.get_message_history(strings.OMNISCIENT_TYPE) + game.messages = self.get_messages(strings.OMNISCIENT_TYPE) + game.observer_level = self.get_observer_level(for_username) + game.phase_abbr = game.current_short_phase + for power in game.powers.values(): # type: Power + power.role = strings.OMNISCIENT_TYPE + power.tokens.clear() + game.role = strings.OMNISCIENT_TYPE + return game + + def as_observer_game(self, for_username): + """ Return an observer game data object copy of this game. """ + game = Game.from_dict(self.to_dict()) + game.controlled_powers = self.get_controlled_power_names(for_username) + game.error = [] + game.message_history = self.get_message_history(strings.OBSERVER_TYPE) + game.messages = self.get_messages(strings.OBSERVER_TYPE) + game.observer_level = self.get_observer_level(for_username) + game.phase_abbr = game.current_short_phase + for power in game.powers.values(): # type: Power + power.role = strings.OBSERVER_TYPE + power.tokens.clear() + power.vote = strings.NEUTRAL + game.role = strings.OBSERVER_TYPE + return game + + def cast(self, role, for_username): + """ Return a copy of this game for given role (either observer role, omniscient role or a power role). """ + assert strings.role_is_special(role) or self.has_power(role) + if role == strings.OBSERVER_TYPE: + return self.as_observer_game(for_username) + if role == strings.OMNISCIENT_TYPE: + return self.as_omniscient_game(for_username) + return self.as_power_game(role) + + def is_controlled_by(self, power_name, username): + """ (for server game) Return True if given power name is controlled by given username. """ + return self.get_power(power_name).is_controlled_by(username) + + def get_observer_level(self, username): + """ Return the highest observation level allowed for given username. + :param username: name of user to get observation right + :return: either 'master_type', 'omniscient_type', 'observer_type' or None. + """ + if (self.server and self.server.users.has_admin(username)) or self.is_moderator(username): + return strings.MASTER_TYPE + if self.is_omniscient(username): + return strings.OMNISCIENT_TYPE + if not self.no_observations: + return strings.OBSERVER_TYPE + return None + + def get_reception_addresses(self): + """ Generate addresses (couple [power name, token]) of all users implied in this game. """ + for power in self.powers.values(): # type: Power + for token in power.tokens: + yield (power.name, token) + for token in self.observer.tokens: + yield (self.observer.name, token) + for token in self.omniscient.tokens: + yield (self.omniscient.name, token) + + def get_special_addresses(self): + """ Generate addresses (couples [power name, token]) of + omniscient observers and simple observers of this game. """ + for power in (self.omniscient, self.observer): + for token in power.tokens: + yield (power.name, token) + + def get_observer_addresses(self): + """ Generate addresses (couples [power name, token]) of observers of this game. """ + for token in self.observer.tokens: + yield (self.observer.name, token) + + def get_omniscient_addresses(self): + """ Generate addresses (couples [power name, token]) of omniscient observers of this game. """ + for token in self.omniscient.tokens: + yield (self.omniscient.name, token) + + def get_special_token_role(self, token): + """ Return role name (either OBSERVER_TYPE or OMNISCIENT_TYPE) for given special token. """ + if self.has_omniscient_token(token): + return strings.OMNISCIENT_TYPE + if self.has_observer_token(token): + return strings.OBSERVER_TYPE + raise exceptions.DiplomacyException('Unknown special token in game %s' % self.game_id) + + def get_power_addresses(self, power_name): + """ Generate addresses (couples [power name, token]) of user controlling given power name. """ + for token in self.get_power(power_name).tokens: + yield (power_name, token) + + def has_player(self, username): + """ (for server game) Return True if given username controls any map power. """ + return any(power.is_controlled_by(username) for power in self.powers.values()) + + def has_token(self, token): + """ Return True if game has given token (either observer, omniscient or player). """ + return self.omniscient.has_token(token) or self.observer.has_token(token) or any( + power.has_token(token) for power in self.powers.values()) + + def has_observer_token(self, token): + """ Return True if game has given observer token. """ + return self.observer.has_token(token) + + def has_omniscient_token(self, token): + """ Return True if game has given omniscient observer token. """ + return self.omniscient.has_token(token) + + def has_player_token(self, token): + """ Return True if game has given player token. """ + return any(power.has_token(token) for power in self.powers.values()) + + def power_has_token(self, power_name, token): + """ Return True if given power has given player token. + :param power_name: name of power to check. + :param token: token to look for. + :return: a boolean + """ + return self.get_power(power_name).has_token(token) + + def add_omniscient_token(self, token): + """ Set given token as omniscient token. """ + if self.observer.has_token(token): + raise exceptions.ResponseException('Token already registered as observer.') + if self.has_player_token(token): + raise exceptions.ResponseException('Token already registered as player.') + self.omniscient.add_token(token) + + def add_observer_token(self, token): + """ Set given token as observer token. """ + if self.omniscient.has_token(token): + raise exceptions.ResponseException('Token already registered as omniscient.') + if self.has_player_token(token): + raise exceptions.ResponseException('Token already registered as player.') + self.observer.add_token(token) + + def transfer_special_token(self, token): + """ Move given token from a special case to another (observer -> omniscient or omniscient -> observer). """ + if self.has_observer_token(token): + self.remove_observer_token(token) + self.add_omniscient_token(token) + elif self.has_omniscient_token(token): + self.remove_omniscient_token(token) + self.add_observer_token(token) + + def control(self, power_name, username, token): + """ Control given power name with given username via given token. """ + if self.observer.has_token(token): + raise exceptions.ResponseException('Token already registered as observer.') + if self.omniscient.has_token(token): + raise exceptions.ResponseException('Token already registered as omniscient.') + power = self.get_power(power_name) # type: Power + if power.is_controlled() and not power.is_controlled_by(username): + raise exceptions.ResponseException('Power already controlled by another user.') + power.set_controlled(username) + power.add_token(token) + + def remove_observer_token(self, token): + """ Remove given observer token. """ + self.observer.remove_tokens([token]) + + def remove_omniscient_token(self, token): + """ Remove given omniscient token. """ + self.omniscient.remove_tokens([token]) + + def remove_special_token(self, special_name, token): + """ Remove given token from given special power name (either __OBSERVER__ or __OMNISCIENT__). """ + if special_name == self.observer.name: + self.remove_observer_token(token) + else: + assert special_name == self.omniscient.name + self.remove_omniscient_token(token) + + def remove_all_tokens(self): + """ Remove all connected tokens from this game. """ + self.observer.tokens.clear() + self.omniscient.tokens.clear() + for power in self.powers.values(): + power.tokens.clear() + + def remove_token(self, token): + """ Remove token from this game. """ + for power in self.powers.values(): # type: Power + power.remove_tokens([token]) + for special_power in (self.observer, self.omniscient): + special_power.remove_tokens([token]) + + def is_moderator(self, username): + """ Return True if given username is a moderator of this game. """ + return username in self.moderator_usernames + + def is_omniscient(self, username): + """ Return True if given username is allowed to be an omniscient observer of this game. """ + return username in self.omniscient_usernames + + def promote_moderator(self, username): + """ Allow given username to be a moderator of this game. """ + self.moderator_usernames.add(username) + + def promote_omniscient(self, username): + """ Allow given username to be an omniscient observer of this game. """ + self.omniscient_usernames.add(username) + + def demote_moderator(self, username): + """ Remove given username from allowed moderators. """ + if username in self.moderator_usernames: + self.moderator_usernames.remove(username) + + def demote_omniscient(self, username): + """ Remove given username from allowed omniscient observers. """ + if username in self.omniscient_usernames: + self.omniscient_usernames.remove(username) + + def filter_usernames(self, filter_function): + """ Remove each omniscient username, moderator username and player controller that does not match given + filter function (if filter_function(username) is False). + :param filter_function: a callable receiving a username and returning a boolean. + :return: an integer, either: + * 0: nothing changed. + * -1: something changed, but no player controllers removed. + * 1: something changed, and some player controllers were removed. + So, if 1 is returned, there are new dummy powers in the game (some notifications may need to be sent). + """ + n_kicked_players = 0 + n_kicked_omniscients = len(self.omniscient_usernames) + n_kicked_moderators = len(self.moderator_usernames) + self.omniscient_usernames = set(username for username in self.omniscient_usernames if filter_function(username)) + self.moderator_usernames = set(username for username in self.moderator_usernames if filter_function(username)) + for power in self.powers.values(): + if power.is_controlled() and not filter_function(power.get_controller()): + power.set_controlled(None) + n_kicked_players += 1 + n_kicked_omniscients -= len(self.omniscient_usernames) + n_kicked_moderators -= len(self.moderator_usernames) + if n_kicked_players: + return 1 + if n_kicked_moderators or n_kicked_omniscients: + return -1 + return 0 + + def filter_tokens(self, filter_function): + """ Remove from this game any token not matching given filter function (if filter_function(token) is False).""" + self.observer.remove_tokens([token for token in self.observer.tokens if not filter_function(token)]) + self.omniscient.remove_tokens([token for token in self.omniscient.tokens if not filter_function(token)]) + for power in self.powers.values(): # type: Power + power.remove_tokens([token for token in power.tokens if not filter_function(token)]) + + def process(self): + """ Process current game phase and move forward to next phase. + :return: a triple containing: + - previous game state (before the processing) + - current game state (after processing and game updates) + - A dictionary mapping kicked power names to tokens previously associated to these powers. + Useful to notify kicked users as they will be not registered in game anymore. + If game was not active, triple is (None, None, None). + If game kicked powers, only kicked powers dict is returned: (None, None, kicked powers). + If game was correctly processed, only states are returned: (prev, curr, None). + """ + if not self.is_game_active: + return None, None, None + # Kick powers if necessary. + all_orderable_locations = self.get_orderable_locations() + kicked_powers = {} + for power in self.powers.values(): + if (power.is_controlled() + and not power.order_is_set + and not self.civil_disorder + and all_orderable_locations[power.name]): + # This controlled power has not submitted orders, we have not rule CIVIL_DISORDER, + # and this power WAS allowed to submit orders for this phase. + # We kick such power. + kicked_powers[power.name] = set(power.tokens) + power.set_controlled(None) + + if kicked_powers: + # Some powers were kicked from an active game before processing. + # This game must be stopped and cannot be processed. We return info about kicked powers. + self.set_status(strings.FORMING) + return None, None, kicked_powers + + # Process game and retrieve previous state. + previous_phase_data = super(ServerGame, self).process() + if self.count_controlled_powers() < self.get_expected_controls_count(): + # There is no more enough controlled powers, we should stop game. + self.set_status(strings.FORMING) + + # Return process results: previous phase data, current phase data, and None for no kicked powers. + return previous_phase_data, self.get_phase_data(), None |