diff options
Diffstat (limited to 'diplomacy/server/request_manager_utils.py')
-rw-r--r-- | diplomacy/server/request_manager_utils.py | 267 |
1 files changed, 267 insertions, 0 deletions
diff --git a/diplomacy/server/request_manager_utils.py b/diplomacy/server/request_manager_utils.py new file mode 100644 index 0000000..9ea8264 --- /dev/null +++ b/diplomacy/server/request_manager_utils.py @@ -0,0 +1,267 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette, Steven Bocco +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +""" Utility classes and functions used for request management. + Put here to avoid having file request_managers.py with too many lines. +""" +from collections.__init__ import namedtuple + +from diplomacy.communication import notifications +from diplomacy.server.notifier import Notifier + +from diplomacy.utils import strings, exceptions + +class SynchronizedData(namedtuple('SynchronizedData', ('timestamp', 'order', 'type', 'data'))): + """ Small class used to store and sort data to synchronize for a game. Properties: + - timestamp (int): timestamp of related data to synchronize. + - order (int): rank of data to synchronize. + - type (str): type name of data to synchronize. Possible values: + - 'message': data is a game message. Order is 0. + - 'state_history': data is a game state for history. Order is 1. + - 'state': data is current game state. Order is 2. + - data: proper data to synchronize. + Synchronized data are sorted using timestamp then order, meaning that: + - data are synchronized from former to later timestamps + - for a same timestamp, messages are synchronized first, then states for history, then current state. + """ + +class GameRequestLevel(): + """ Describe a game level retrieved from a game request. Used by some game requests managers + to determine user rights in a game. Possible game levels: power, observer, omniscient and master. + """ + __slots__ = ['game', 'power_name', '__action_level'] + + def __init__(self, game, action_level, power_name): + """ Initialize a game request level. + :param game: related game data + :param action_level: action level, either: + - 'power' + - 'observer' + - 'omniscient' + - 'master' + :param power_name: (optional) power name specified in game request. Required if level is 'power'. + :type game: diplomacy.server.server_game.ServerGame + :type action_level: str + :type power_name: str + """ + assert action_level in {'power', 'observer', 'omniscient', 'master'} + self.game = game + self.power_name = power_name # type: str + self.__action_level = action_level # type: str + + def is_power(self): + """ Return True if game level is power. """ + return self.__action_level == 'power' + + def is_observer(self): + """ Return True if game level is observer. """ + return self.__action_level == 'observer' + + def is_omniscient(self): + """ Return True if game level is omniscient. """ + return self.__action_level == 'omniscient' + + def is_master(self): + """ Return True if game level is master. """ + return self.__action_level == 'master' + + @classmethod + def power_level(cls, game, power_name): + """ Create and return a game power level with given game data and power name. """ + return cls(game, 'power', power_name) + + @classmethod + def observer_level(cls, game, power_name): + """ Create and return a game observer level with given game data and power name. """ + return cls(game, 'observer', power_name) + + @classmethod + def omniscient_level(cls, game, power_name): + """ Create and return a game omniscient level with given game data and power name. """ + return cls(game, 'omniscient', power_name) + + @classmethod + def master_level(cls, game, power_name): + """ Create and return a game master level with given game data and power name. """ + return cls(game, 'master', power_name) + +def verify_request(server, request, connection_handler, + omniscient_role=True, observer_role=True, power_role=True, require_power=False, require_master=True): + """ Verify request token, and game role and rights if request is a game request. + Ignore connection requests (e.g. SignIn), as such requests don't have any token. + Verifying token: + - check if server knows request token + - check if request token is still valid. + - Update token lifetime. See method Server.assert_token() for more details. + Verifying game role and rights: + - check if server knows request game ID. + - check if request token is allowed to have request game role in associated game ID. + If request is a game request, return a GameRequestLevel containing: + - the server game object + - the level of rights (power, observer or master) allowed for request sender. + - the power name associated to request (if present), representing which power is queried by given request. + See class GameRequestLevel for more details. + :param server: server which receives the request + :param request: request received by server + :param connection_handler: connection handler which receives the request + :param omniscient_role: (for game requests) Indicate if omniscient role is accepted for this request. + :param observer_role: (for game requests) Indicate if observer role is accepted for this request. + :param power_role: (for game requests) Indicate if power role is accepted for this request. + :param require_power: (for game requests) Indicate if a power name is required for this request. + If true, either game role must be power role, or request must have a non-null `power_name` request. + :param require_master: (for game requests) Indicate if an omniscient must be a master. + If true and if request role is omniscient, then request token must be a master token for related game. + :return: a GameRequestLevel object for game requests, else None. + :rtype: diplomacy.server.request_manager_utils.GameRequestLevel + :type server: diplomacy.Server + :type request: requests._AbstractRequest | requests._AbstractGameRequest + :type connection_handler: diplomacy.server.connection_handler.ConnectionHandler + """ + + # A request may be a connection request, a channel request or a game request. + # For connection request, field level is None. + # For channel request, field level is CHANNEL. Channel request has a `token` field. + # For game request, field level is GAME. Game request is a channel request with supplementary fields + # `game_role` and `game_id`. + + # No permissions to check for connection requests (e.g. SignIn). + if not request.level: + return None + + # Check token for channel and game requests. + server.assert_token(request.token, connection_handler) + + # No more permissions to check for non-game requests. + if request.level != strings.GAME: + return None + + # Check and get game. + server_game = server.get_game(request.game_id) + + power_name = getattr(request, 'power_name', None) + + if strings.role_is_special(request.game_role): + + if request.game_role == strings.OMNISCIENT_TYPE: + + # Check if omniscient role is accepted (for this call). + if not omniscient_role: + raise exceptions.ResponseException( + 'Omniscient role disallowed for request %s' % request.name) + + # Check if request token is known as omniscient token by related game. + if not server_game.has_omniscient_token(request.token): + raise exceptions.GameTokenException() + + # Check if request token is a master token (if required for this call) + # and create game request level. + token_is_master = server.token_is_master(request.token, server_game) + if require_master and not token_is_master: + raise exceptions.GameMasterTokenException() + if token_is_master: + level = GameRequestLevel.master_level(server_game, power_name) + else: + level = GameRequestLevel.omniscient_level(server_game, power_name) + + else: + # Check if observer role is accepted (for this call). + if not observer_role: + raise exceptions.ResponseException( + 'Observer role disallowed for request %s' % request.game_role) + + # Check if request token is known as observer token by related game. + if not server_game.has_observer_token(request.token): + raise exceptions.GameTokenException() + + # Create game request level object. + level = GameRequestLevel.observer_level(server_game, power_name) + + # Check if we have a valid power name if power name is required (for this call) or given. + if power_name is None: + if require_power: + raise exceptions.MapPowerException(None) + elif not server_game.has_power(power_name): + raise exceptions.MapPowerException(power_name) + + else: + # Check if power role is accepted (for this call). + if not power_role: + raise exceptions.ResponseException('Power role disallowed for request %s' % request.name) + + # Get power name to check: either given power name if defined, else game role. + if power_name is None: + power_name = request.game_role + + # Check if given power name is valid. + if not server_game.has_power(power_name): + raise exceptions.MapPowerException(power_name) + + # Check if request sender is allowed to query given power name. + # We don't care anymore if sender token is currently associated to this power, + # as long as sender is registered as the controller of this power. + if not server_game.is_controlled_by(power_name, server.users.get_name(request.token)): + raise exceptions.ResponseException('User %s does not currently control power %s' + % (server.users.get_name(request.token), power_name)) + + # Create game request level. + level = GameRequestLevel.power_level(server_game, power_name) + + return level + +def transfer_special_tokens(server_game, server, username, grade_update, from_observation=True): + """ Transfer tokens of given username from an observation role to the opposite in given server game, + and notify all user tokens about observation role update with given grade update. + This method is used in request manager on_set_grade(). + :param server_game: server game in which tokens roles must be changed. + :param server: server from which notifications will be sent. + :param username: name of user whom tokens will be transferred. Only user tokens registered in + server games as observer tokens or omniscient tokens will be updated. + :param grade_update: type of upgrading. Possibles values in strings.ALL_GRADE_UPDATES (PROMOTE or DEMOTE). + :param from_observation: indicate transfer direction. + If True, we expect to transfer role from observer to omniscient. + If False, we expect to transfer role from omniscient to observer. + :type server_game: diplomacy.server.server_game.ServerGame + :type server: diplomacy.Server + """ + if from_observation: + old_role = strings.OBSERVER_TYPE + new_role = strings.OMNISCIENT_TYPE + token_filter = server_game.has_observer_token + else: + old_role = strings.OMNISCIENT_TYPE + new_role = strings.OBSERVER_TYPE + token_filter = server_game.has_omniscient_token + + connected_user_tokens = [user_token for user_token in server.users.get_tokens(username) if token_filter(user_token)] + + if connected_user_tokens: + + # Update observer level for each connected user token. + for user_token in connected_user_tokens: + server_game.transfer_special_token(user_token) + + addresses = [(old_role, user_token) for user_token in connected_user_tokens] + Notifier(server).notify_game_addresses( + server_game.game_id, addresses, notifications.OmniscientUpdated, + grade_update=grade_update, game=server_game.cast(new_role, username, server.users.has_admin(username))) + +def assert_game_not_finished(server_game): + """ Check if given game is not yet completed or canceled, otherwise raise a GameFinishedException. + :param server_game: server game to check + :type server_game: diplomacy.server.server_game.ServerGame + """ + if server_game.is_game_completed or server_game.is_game_canceled: + raise exceptions.GameFinishedException() |