diff options
Diffstat (limited to 'diplomacy/engine')
-rw-r--r-- | diplomacy/engine/__init__.py | 16 | ||||
-rw-r--r-- | diplomacy/engine/game.py | 4289 | ||||
-rw-r--r-- | diplomacy/engine/map.py | 1361 | ||||
-rw-r--r-- | diplomacy/engine/message.py | 115 | ||||
-rw-r--r-- | diplomacy/engine/power.py | 392 | ||||
-rw-r--r-- | diplomacy/engine/renderer.py | 789 |
6 files changed, 6962 insertions, 0 deletions
diff --git a/diplomacy/engine/__init__.py b/diplomacy/engine/__init__.py new file mode 100644 index 0000000..4f2769f --- /dev/null +++ b/diplomacy/engine/__init__.py @@ -0,0 +1,16 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== diff --git a/diplomacy/engine/game.py b/diplomacy/engine/game.py new file mode 100644 index 0000000..73b2ff9 --- /dev/null +++ b/diplomacy/engine/game.py @@ -0,0 +1,4289 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +# -*- coding: utf-8 -*- +""" Game + - Contains the game engine +""" +# pylint: disable=too-many-lines +import os +import sys +import time +import uuid +import random +from copy import deepcopy + +import numpy as np + +from diplomacy import settings +import diplomacy.utils.errors as err +from diplomacy.engine.map import Map +from diplomacy.engine.message import Message, GLOBAL +from diplomacy.engine.power import Power +from diplomacy.engine.renderer import Renderer +from diplomacy.utils import PriorityDict, common, exceptions, parsing, strings +from diplomacy.utils.jsonable import Jsonable +from diplomacy.utils.sorted_dict import SortedDict +from diplomacy.utils.constants import OrderSettings +from diplomacy.utils.game_phase_data import GamePhaseData, MESSAGES_TYPE + +# Constants +UNDETERMINED, POWER, UNIT, LOCATION, COAST, ORDER, MOVE_SEP, OTHER = 0, 1, 2, 3, 4, 5, 6, 7 + +class Game(Jsonable): + """ + - combat - Dictionary of dictionaries containing the strength of every attack on a location (including units + who don't count toward dislodgment) + - Format: {loc: attack_strength: [ ['src loc', [support loc] ]} + e.g. { 'MUN': { 1 : [ ['A MUN', [] ], ['A RUH', [] ] ], 2 : [ ['A SIL', ['A BOH'] ] ] } } + MUN is holding, being attack without support from RUH and being attacked with support from SIL + (S from BOH) + - command - Contains the list of finalized orders to be processed (same format as orders, but without .order) + e.g. {'A PAR': '- A MAR'} + - controlled_powers: for client games only. List of powers currently controlled by associated client user. + - convoy_paths - Contains the list of remaining convoys path for each convoyed unit to reach their destination + Note: This is used to see if there are still active convoy paths remaining. + Note: This also include the start and ending location + e.g. {'A PAR': [ ['PAR', 'ION','NAO', 'MAR], ['PAR', 'ION', 'MAR'] ], ... } + - convoy_paths_possible - Contains the list of possible convoy paths given the current fleet locations or None + e.g. [(START_LOC, {Fleets Req}, {possible dest}), ...] + - convoy_paths_dest - Contains a dictionary of possible paths to reach destination from start or None + e.g. {start_loc: {dest_loc_1: [{fleets}, {fleets}, {fleets}], dest_loc_2: [{fleets, fleets}]} + - deadline: integer: game deadline in seconds. + - dislodged - Contains a dictionary of dislodged units (and the site that dislodged them') + e.g. { 'A PAR': 'MAR' } + - error - Contains a list of errors that the game generated + e.g. ['NO MASTER SPECIFIED'] + - game_id: String that contains the current game's ID + e.g. '123456' + - lost - Contains a dictionary of centers that have been lost during the term + e.g. {'PAR': 'FRANCE'} to indicate that PAR was lost by France (previous owner) + - map: Contains a reference to the current map (Map instance) + e.g. map = Map('standard') + - map_name: Contains a reference to the name of the map that was loaded + e.g. map_name = 'standard' + - messages (only for non-observer games): history of messages exchanged inside this game. + Sorted dict mapping message timestamps to message objects (instances of diplomacy.Message). + Format: {message.time_sent => message} + - message_history: history of messages through all played phases. + Sorted dict mapping a short phase name to a message dict + (with same format as field `message` describe above). + Format: {short phase name => {message.time_sent => message}} + Wrapped in a sorted dict at runtime, see method __init__(). + - meta_rules - Contains the rules that have been processed as directives + e.g. ['NO_PRESS'] + - n_controls: integer: exact number of controlled powers allowed for this game. + If game start mode is not START_MASTER, then game starts as soon as this number of powers + are controlled. + - no_rules - Contains the list of rules that have been disabled (prefixed with '!') + e.g ['NO_PRESS'] + - note - A note to display on the rendering + e.g. 'Winner: FRANCE' + - observer_level: for client games only. Highest observation level allowed for associated client user. + Either "master_type", "omniscient_type" or "observer_type". + - orders - Contains the list of current orders (not yet processed) + e.g. {'A PAR': '- A MAR'} + - ordered_units - Contains a dictionary of the units ordered by each power in the last phase + e.g. {'FRANCE': ['A PAR', 'A MAR'], 'ENGLAND': ... } + - order_history - Contains the history of orders from each player from the beginning of the game. + Sorted dict mapping a short phase name to a dictionary of orders + (powers names as keys, powers orders as values). + Format: {short phase name => {power name => [orders]}} + Wrapped in a sorted dict at runtime, see method __init__(). + - outcome - Contains the game outcome + e.g. [lastPhase, victor1, victor2, victor3] + - phase: String that contains a long representation of the current phase + e.g. 'SPRING 1901 MOVEMENT' + - phase_type: Indicates the current phase type + (e.g. 'M' for Movement, 'R' for Retreats, 'A' for Adjustment, '-' for non-playing phase) + e.g. 'M' + - popped - Contains a list of all retreaters who didn't make it + e.g. ['A PAR', 'A MAR'] + - powers - Contains a dictionary mapping power names to power instances in the game + e.g. {'FRANCE': FrancePower, 'ENGLAND': EnglishPower, ...} + - registration_password: ** hashed ** version of password to be sent by a player to join this game. + - renderer - Contains the object in charge of rendering the map + e.g. Renderer() + - result - Contains the result of the action for each unit. + In Movement Phase, result can be 'no convoy', 'bounce', 'void', 'cut', 'dislodged', 'disrupted' + e.g. { 'A PAR': ['cut', 'void'] } + In Retreats phase, result can be 'bounce', 'disband', 'void' + e.g. { 'A PAR': ['cut', 'void'] } + In Adjustments phase, result can be 'void' or '' + e.g. { 'A PAR': ['', 'void'] } # e.g. to indicate a successful build, and a void build. + - result_history - Contains the history of orders results for all played phases. + Sorted dict mapping a short phase name to a dictionary of order results for this phase. + Dictionary of order results maps a unit to a list of results. See field result for more details. + Format: {short phase name => {unit => [results]}} + Wrapped in a sorted dict at runtime, see method __init__(). + - role: game type (observer, omniscient, player or server game). + Either a power name (for player game) or a value in diplomacy.utils.strings.ALL_ROLE_TYPES. + - rules: Contains a list of active rules + e.g. ['NO_PRESS', ...] + - state_history: history of previous game states (returned by method get_state()) for this game. + Sorted dict mapping a short phase name to a game state. + Each game state is associated to a timestamp generated when state is created by method get_state(). + State timestamp then represents the "end" time of the state, ie. time when this state was saved and + archived in state history. + Format: {short phase name => state} + Wrapped in a sorted dict at runtime, see method __init__(). + - status: game status (forming, active, paused, completed or canceled). + Possible values in diplomacy.utils.strings.ALL_GAME_STATUSES. + - supports - Contains a dictionary of support for each unit + - Format: { 'unit': [nb_of_support, [list of supporting units]] } + e.g. { 'A PAR': [2, ['A MAR']] } + 2 support, but the Marseille support does NOT count toward dislodgment + - timestamp_created: timestamp in microseconds when game object was created on server side. + - victory - Indicates the number of SUPPLY [default] centers one power must control to win the game + - Format: [reqFirstYear, reqSecondYear, ..., reqAllFurtherYears] + e.g. [10,10,18] for 10 the 1st year, 10 the 2nd year, 18 year 3+ + - win - Indicates the minimum number of centers required to win + e.g. 3 + - zobrist_hash - Contains the zobrist hash representing the current state of this game + e.g. 12545212418541325 + """ + # pylint: disable=too-many-instance-attributes + __slots__ = ['victory', 'no_rules', 'meta_rules', 'phase', 'note', 'map', 'powers', 'outcome', 'error', 'popped', + 'messages', 'order_history', 'orders', 'ordered_units', 'phase_type', 'win', 'combat', 'command', + 'result', 'supports', 'dislodged', 'lost', 'convoy_paths', 'convoy_paths_possible', + 'convoy_paths_dest', 'zobrist_hash', 'renderer', 'game_id', 'map_name', 'role', 'rules', + 'message_history', 'state_history', 'result_history', 'status', 'timestamp_created', 'n_controls', + 'deadline', 'registration_password', 'observer_level', 'controlled_powers', '_phase_wrapper_type', + 'phase_abbr'] + zobrist_tables = {} + rule_cache = () + model = { + strings.CONTROLLED_POWERS: parsing.OptionalValueType(parsing.SequenceType(str)), + strings.DEADLINE: parsing.DefaultValueType(int, 300), + strings.ERROR: parsing.DefaultValueType(parsing.SequenceType(str), []), + strings.GAME_ID: parsing.OptionalValueType(str), + strings.MAP_NAME: parsing.DefaultValueType(str, 'standard'), + strings.MESSAGE_HISTORY: parsing.DefaultValueType(parsing.DictType(str, MESSAGES_TYPE), {}), + strings.MESSAGES: parsing.DefaultValueType(MESSAGES_TYPE, []), + strings.META_RULES: parsing.DefaultValueType(parsing.SequenceType(str), []), + strings.N_CONTROLS: parsing.OptionalValueType(int), + strings.NO_RULES: parsing.DefaultValueType(parsing.SequenceType(str, set), []), + strings.NOTE: parsing.DefaultValueType(str, ''), + strings.OBSERVER_LEVEL: parsing.OptionalValueType( + parsing.EnumerationType((strings.MASTER_TYPE, strings.OMNISCIENT_TYPE, strings.OBSERVER_TYPE))), + strings.ORDER_HISTORY: parsing.DefaultValueType( + parsing.DictType(str, parsing.DictType(str, parsing.SequenceType(str))), {}), + strings.OUTCOME: parsing.DefaultValueType(parsing.SequenceType(str), []), + strings.PHASE: parsing.DefaultValueType(str, ''), + strings.PHASE_ABBR: parsing.DefaultValueType(str, ''), + strings.POWERS: parsing.DefaultValueType(parsing.DictType(str, parsing.JsonableClassType(Power)), {}), + strings.REGISTRATION_PASSWORD: parsing.OptionalValueType(str), + strings.RESULT_HISTORY: parsing.DefaultValueType(parsing.DictType(str, parsing.DictType( + str, parsing.SequenceType(parsing.EnumerationType( + ['no convoy', 'bounce', 'void', 'cut', 'dislodged', 'disrupted', 'disband', ''])))), {}), + strings.ROLE: parsing.DefaultValueType(str, strings.SERVER_TYPE), + strings.RULES: parsing.DefaultValueType(parsing.SequenceType(str, sequence_builder=list), ()), + strings.STATE_HISTORY: parsing.DefaultValueType(parsing.DictType(str, dict), {}), + strings.STATUS: parsing.DefaultValueType(parsing.EnumerationType(strings.ALL_GAME_STATUSES), strings.FORMING), + strings.TIMESTAMP_CREATED: parsing.OptionalValueType(int), + strings.VICTORY: parsing.DefaultValueType(parsing.SequenceType(int), []), + strings.WIN: parsing.DefaultValueType(int, 0), + strings.ZOBRIST_HASH: parsing.DefaultValueType(parsing.StringableType(np.int64), '0'), + } + + def __init__(self, game_id=None, **kwargs): + """ Constructor """ + self.victory = None + self.no_rules = set() + self.meta_rules = [] + self.phase, self.note = '', '' + self.map = None # type: Map + self.powers = {} + self.outcome, self.error, self.popped = [], [], [] + self.orders, self.ordered_units = {}, {} + self.phase_type = None + self.win = None + self.combat, self.command, self.result = {}, {}, {} + self.supports, self.dislodged, self.lost = {}, {}, {} + self.convoy_paths, self.convoy_paths_possible, self.convoy_paths_dest = {}, None, None + self.zobrist_hash = 0 + self.renderer = None + self.game_id = None # type: str + self.map_name = None # type: str + self.messages = None # type: SortedDict + self.role = None # type: str + self.rules = [] + self.state_history, self.order_history, self.result_history, self.message_history = {}, {}, {}, {} + self.status = None # type: str + self.timestamp_created = None # type: int + self.n_controls = None + self.deadline = 0 + self.registration_password = None + self.observer_level = None + self.controlled_powers = None + + # Remove rules from kwargs (if present), as we want to add them manually using self.add_rule(). + rules = kwargs.pop(strings.RULES, None) + + # Update rules with game ID. + kwargs[strings.GAME_ID] = game_id + + # Initialize game with kwargs. + super(Game, self).__init__(**kwargs) + + # Check settings. + if self.registration_password is not None and self.registration_password == '': + raise exceptions.DiplomacyException('Registration password must be None or non-empty string.') + if self.n_controls is not None and self.n_controls < 0: + raise exceptions.NaturalIntegerException('n_controls must be a natural integer.') + if self.deadline < 0: + raise exceptions.NaturalIntegerException('Deadline must be a natural integer.') + # Check rules. + if rules is None: + rules = ['SOLITAIRE', 'NO_PRESS', 'IGNORE_ERRORS', 'POWER_CHOICE'] + # Set game rules. + for rule in rules: + self.add_rule(rule) + # Check settings about rule NO_DEADLINE. + if 'NO_DEADLINE' in self.rules: + self.deadline = 0 + # Check settings about rule SOLITAIRE. + if 'SOLITAIRE' in self.rules: + self.n_controls = 0 + elif self.n_controls == 0: + # If number of allowed players is 0, the game can only be solitaire. + self.add_rule('SOLITAIRE') + + # Check timestamp_created. + if self.timestamp_created is None: + self.timestamp_created = common.timestamp_microseconds() + + # Check game ID. + if self.game_id is None: + self.game_id = '%s/%s' % (self.timestamp_created, uuid.uuid4()) + + # Validating status + self._validate_status(reinit_powers=(self.timestamp_created is None)) + + if self.powers: + # Game loaded with powers. + # Associate loaded powers with this game. + for power in self.powers.values(): + power.game = self + else: + # Begin game. + self._begin() + + # Game loaded. + + # Check map powers. + assert all(self.has_power(power_name) for power_name in self.map.powers) + + # Check role and consistency between all power roles and game role. + if self.has_power(self.role): + # It's a power game. Each power must be a player power. + assert all(power.role == power.name for power in self.powers.values()) + else: + # We should have a non-power game and each power must have same role as game role. + assert self.role in strings.ALL_ROLE_TYPES + assert all(power.role == self.role for power in self.powers.values()) + + # Wrap history fields into runtime sorted dictionaries. + # This is necessary to sort history fields by phase name. + + self._phase_wrapper_type = common.str_cmp_class(self.map.compare_phases) + + self.order_history = SortedDict(self._phase_wrapper_type, dict, + {self._phase_wrapper_type(key): value + for key, value in self.order_history.items()}) + self.message_history = SortedDict(self._phase_wrapper_type, SortedDict, + {self._phase_wrapper_type(key): value + for key, value in self.message_history.items()}) + self.state_history = SortedDict(self._phase_wrapper_type, dict, + {self._phase_wrapper_type(key): value + for key, value in self.state_history.items()}) + self.result_history = SortedDict(self._phase_wrapper_type, dict, + {self._phase_wrapper_type(key): value + for key, value in self.result_history.items()}) + + def __str__(self): + """ Returns a string representation of the game instance """ + show_map = self.map + show_result = self.outcome + + text = '' + text += 'GAME %s%s%s' % (self.game_id, '\nPHASE ', self.phase) + text += '\nMAP %s' % self.map_name if show_map else '' + text += '\nRESULT %s' % ' '.join(self.outcome) if show_result else '' + text += '\nRULE '.join([''] + [rule for rule in self.rules if rule not in self.meta_rules]) + text += '\nRULE !'.join([''] + [no_rule for no_rule in self.no_rules]) + return text + + def __deepcopy__(self, memo): + """ Fast deep copy implementation """ + cls = self.__class__ + result = cls.__new__(cls) + + # Deep copying + for key in self._slots: + if key in ['map', 'renderer', 'powers']: + continue + setattr(result, key, deepcopy(getattr(self, key))) + setattr(result, 'map', self.map) + setattr(result, 'powers', {}) + for power in self.powers.values(): + result.powers[power.name] = deepcopy(power) + setattr(result.powers[power.name], 'game', result) + return result + + # ==================================================================== + # Public Interface + # ==================================================================== + + @property + def _slots(self): + """ Return an iterable of all attributes of this object. + Should be used in place of "self.__slots__" to be sure to retrieve all + attribute names from a derived class (including parent slots). + """ + return (name for cls in type(self).__mro__ for name in getattr(cls, '__slots__', ())) + + @property + def power(self): + """ (only for player games) Return client power associated to this game. + :return: a Power object. + :rtype: Power + """ + return self.powers[self.role] if self.is_player_game() else None + + @property + def is_game_done(self): + """ Returns a boolean flag that indicates if the game is done """ + return self.phase == 'COMPLETED' + + is_game_forming = property(lambda self: self.status == strings.FORMING) + is_game_active = property(lambda self: self.status == strings.ACTIVE) + is_game_paused = property(lambda self: self.status == strings.PAUSED) + is_game_canceled = property(lambda self: self.status == strings.CANCELED) + is_game_completed = property(lambda self: self.status == strings.COMPLETED) + current_short_phase = property(lambda self: self.map.phase_abbr(self.phase, self.phase)) + + civil_disorder = property(lambda self: 'CIVIL_DISORDER' in self.rules) + multiple_powers_per_player = property(lambda self: 'MULTIPLE_POWERS_PER_PLAYER' in self.rules) + no_observations = property(lambda self: 'NO_OBSERVATIONS' in self.rules) + no_press = property(lambda self: 'NO_PRESS' in self.rules) + power_choice = property(lambda self: 'POWER_CHOICE' in self.rules) + public_press = property(lambda self: 'PUBLIC_PRESS' in self.rules) + real_time = property(lambda self: 'REAL_TIME' in self.rules) + start_master = property(lambda self: 'START_MASTER' in self.rules) + solitaire = property(lambda self: 'SOLITAIRE' in self.rules) + + # ============================================================== + # Application/network methods (mainly used for connected games). + # ============================================================== + + def is_player_game(self): + """ Return True if this game is a player game. """ + return self.has_power(self.role) + + def is_observer_game(self): + """ Return True if this game is an observer game. """ + return self.role == strings.OBSERVER_TYPE + + def is_omniscient_game(self): + """ Return True if this game is an omniscient game. """ + return self.role == strings.OMNISCIENT_TYPE + + def is_server_game(self): + """ Return True if this game is a server game. """ + return self.role == strings.SERVER_TYPE + + def is_valid_password(self, registration_password): + """ Return True if given plain password matches registration password. """ + if self.registration_password is None: + return registration_password is None + if registration_password is None: + return False + return common.is_valid_password(registration_password, self.registration_password) + + def is_controlled(self, power_name): + """ Return True if given power name is currently controlled. """ + return self.get_power(power_name).is_controlled() + + def is_dummy(self, power_name): + """ Return True if given power name is not currently controlled. """ + return not self.is_controlled(power_name) + + def does_not_wait(self): + """ Return True if the game does not wait anything to process its current phase. + The game is not waiting is all **controlled** powers have defined orders and wait flag set to False. + If it's a solitaire game (with no controlled powers), all (dummy, not eliminated) powers must have defined + orders and wait flag set to False. By default, wait flag for a dummy power is True. + Note that an empty orders set is considered as a defined order as long as it was + explicitly set by the power controller. + """ + if any(power.is_controlled() for power in self.powers.values()): + return all(power.does_not_wait() for power in self.powers.values() if power.is_controlled()) + return all(power.is_eliminated() or power.does_not_wait() for power in self.powers.values()) + + def has_power(self, power_name): + """ Return True if this game has given power name. """ + return power_name in self.map.powers + + def has_expected_controls_count(self): + """ Return True if game has expected number of map powers to be controlled. + If True, the game can start (if not yet started). + """ + return self.count_controlled_powers() == self.get_expected_controls_count() + + def count_controlled_powers(self): + """ Return the number of controlled map powers. """ + return sum(1 for power_name in self.get_map_power_names() if self.is_controlled(power_name)) + + def get_controlled_power_names(self, username): + """ Return the list of power names currently controlled by given user name. """ + return [power.name for power in self.powers.values() if power.is_controlled_by(username)] + + def get_expected_controls_count(self): + """ Return the number of map powers expected to be controlled in this game. + This number is either specified in settings or the number of map powers. + """ + expected_count = self.n_controls + if expected_count is None: + expected_count = len(self.powers) + return expected_count + + def get_map_power_names(self): + """ Return sequence of map power names. """ + return self.powers.keys() + + def get_dummy_power_names(self): + """ Return sequence of dummy power objects. """ + return set(power_name for power_name in self.get_map_power_names() if self.is_dummy(power_name)) + + def get_controllers(self): + """ Return a dictionary mapping each power name to its current controller name.""" + return {power.name: power.get_controller() for power in self.powers.values()} + + def get_controllers_timestamps(self): + """ Return a dictionary mapping each power name to its controller timestamp. """ + return {power.name: power.get_controller_timestamp() for power in self.powers.values()} + + def get_random_power_name(self): + """ Return a random power name from remaining dummy power names. + Raise an exception if there are no dummy power names. + """ + playable_power_names = list(self.get_dummy_power_names()) + if not playable_power_names: + raise exceptions.RandomPowerException(1, len(playable_power_names)) + playable_power_names.sort() + return playable_power_names[random.randint(0, len(playable_power_names) - 1)] + + def get_latest_timestamp(self): + """ Return timestamp of latest data saved into this game (either current state, archived state or message). + :return: a timestamp + :rtype: int + """ + timestamp = self.timestamp_created + if self.state_history: + timestamp = max(self.state_history.last_value()['timestamp'], timestamp) + if self.messages: + timestamp = max(self.messages.last_key(), timestamp) + return timestamp + + @classmethod + def filter_messages(cls, messages, game_role, timestamp_from=None, timestamp_to=None): + """ Filter given messages based on given game role between given timestamps (bounds included). + See method diplomacy.utils.SortedDict.sub() about bound rules. + :param messages: a sorted dictionary of messages to filter. + :param game_role: game role requiring messages. Either a special power name + (PowerName.OBSERVER or PowerName.OMNISCIENT), a power name, or a list of power names. + :param timestamp_from: lower timestamp (included) for required messages. + :param timestamp_to: upper timestamp (included) for required messages. + :return: a dict of corresponding messages (empty if no corresponding messages found), + mapping messages timestamps to messages. + :type messages: diplomacy.utils.sorted_dict.SortedDict + """ + + # Observer can see global messages and system messages sent to observers. + if isinstance(game_role, str) and game_role == strings.OBSERVER_TYPE: + return {message.time_sent: message + for message in messages.sub(timestamp_from, timestamp_to) + if message.is_global() or message.for_observer()} + + # Omniscient observer can see all messages. + if isinstance(game_role, str) and game_role == strings.OMNISCIENT_TYPE: + return {message.time_sent: message + for message in messages.sub(timestamp_from, timestamp_to)} + + # Power can see global messages and all messages she sent or received. + if isinstance(game_role, str): + game_role = [game_role] + elif not isinstance(game_role, list): + game_role = list(game_role) + return {message.time_sent: message + for message in messages.sub(timestamp_from, timestamp_to) + if message.is_global() or message.recipient in game_role or message.sender in game_role} + + def get_phase_history(self, from_phase=None, to_phase=None, game_role=None): + """ Return a list of game phase data from game history between given phases (bounds included). + Each GamePhaseData object contains game state, messages, orders and order results for a phase. + :param from_phase: either: + - a string: phase name + - an integer: index of phase in game history + - None (default): lowest phase stored in game history + :param to_phase: either: + - a string: phase name + - an integer: index of phase in game history + - None (default): latest phase stored in game history + :param game_role (optional): role of game for which phase history is retrieved. + If none, messages in game history will not be filtered. + """ + if isinstance(from_phase, int): + from_phase = self.state_history.key_from_index(from_phase) + elif isinstance(from_phase, str): + from_phase = self._phase_wrapper_type(from_phase) + if isinstance(to_phase, int): + to_phase = self.state_history.key_from_index(to_phase) + elif isinstance(to_phase, str): + to_phase = self._phase_wrapper_type(to_phase) + phases = self.state_history.sub_keys(from_phase, to_phase) + states = self.state_history.sub(from_phase, to_phase) + orders = self.order_history.sub(from_phase, to_phase) + messages = self.message_history.sub(from_phase, to_phase) + results = self.result_history.sub(from_phase, to_phase) + if game_role: + messages = [self.filter_messages(msg_dict, game_role) for msg_dict in messages] + assert len(phases) == len(states) == len(orders) == len(messages) == len(results), ( + len(phases), len(states), len(orders), len(messages), len(results)) + return [GamePhaseData(name=str(phases[i]), + state=states[i], + orders=orders[i], + messages=messages[i], + results=results[i]) + for i in range(len(phases))] + + def get_phase_from_history(self, short_phase_name, game_role=None): + """ Return a game phase data corresponding to given phase from phase history. """ + return self.get_phase_history(short_phase_name, short_phase_name, game_role)[0] + + def phase_history_from_timestamp(self, timestamp): + """ Return list of game phase data from game history for which state timestamp >= given timestamp. """ + earliest_phase = '' + for state in self.state_history.reversed_values(): + if state['timestamp'] < timestamp: + break + earliest_phase = state['name'] + return self.get_phase_history(from_phase=earliest_phase) if earliest_phase else [] + + def extend_phase_history(self, game_phase_data): + """ Add data from a game phase to game history. + :param game_phase_data: a GamePhaseData object. + :type game_phase_data: GamePhaseData + """ + phase = self._phase_wrapper_type(game_phase_data.name) + assert phase not in self.state_history + assert phase not in self.message_history + assert phase not in self.order_history + assert phase not in self.result_history + self.state_history.put(phase, game_phase_data.state) + self.message_history.put(phase, game_phase_data.messages) + self.order_history.put(phase, game_phase_data.orders) + self.result_history.put(phase, game_phase_data.results) + + def set_status(self, status): + """ Set game status with given status (should be in diplomacy.utils.strings.ALL_GAME_STATUSES). """ + assert status in strings.ALL_GAME_STATUSES + self.status = status + + def draw(self, winners=None): + """ Force a draw for this game, set status as COMPLETED and finish the game. + :param winners: (optional) either None (all powers remaining to map are considered winners) or a sequence + of required power names to be considered as winners. + :return: a couple (previous state, current state) with game state before the draw and game state after + the draw. + """ + if winners is None: + # Draw with all powers which still have units in map. + winners = [power.name for power in self.powers.values() if power.units] + + # No orders will be processed when drawing, so clear current orders. + self.clear_orders() + + # Collect data about current phase before drawing. + previous_phase = self._phase_wrapper_type(self.current_short_phase) + previous_orders = self.get_orders() + previous_messages = self.messages.copy() + previous_state = self.get_state() + + # Finish the game. + self._finish(winners) + + # Then clear game and save previous phase. + self.clear_vote() + self.clear_orders() + self.messages.clear() + self.order_history.put(previous_phase, previous_orders) + self.message_history.put(previous_phase, previous_messages) + self.state_history.put(previous_phase, previous_state) + self.result_history.put(previous_phase, {}) + + # There are no expected results for orders, as there are no orders processed. + + previous_phase_data = GamePhaseData(name=str(previous_phase), + state=previous_state, + orders=previous_orders, + messages=previous_messages, + results={}) + current_phase_data = GamePhaseData(name=self.current_short_phase, + state=self.get_state(), + orders={}, + messages={}, + results={}) + + return previous_phase_data, current_phase_data + + def set_controlled(self, power_name, username): + """ Control power with given username (may be None to set dummy power). + See method diplomacy.Power#set_controlled. + """ + self.get_power(power_name).set_controlled(username) + + def update_dummy_powers(self, dummy_power_names): + """ Force all power associated to given dummy power names to be uncontrolled. + :param dummy_power_names: Sequence of required dummy power names. + """ + for dummy_power_name in dummy_power_names: + if self.has_power(dummy_power_name): + self.set_controlled(dummy_power_name, None) + + def update_powers_controllers(self, powers_controllers, timestamps): + """ Update powers controllers. + :param powers_controllers: a dictionary mapping a power name to a controller name. + :param timestamps: a dictionary mapping a power name to timestamp when related controller + (in powers_controllers) was associated to power. + :type powers_controllers: dict + """ + for power_name, controller in powers_controllers.items(): + self.get_power(power_name).update_controller(controller, timestamps[power_name]) + + def new_power_message(self, recipient, body): + """ Create a undated (without timestamp) power message to be sent from a power to another via server. + Server will answer with timestamp, and message will be updated + and added to local game messages. + :param recipient: recipient power name (string). + :param body: message body (string). + :return: a new GameMessage object. + :rtype: GameMessage + """ + assert self.is_player_game() + if not self.has_power(recipient): + raise exceptions.MapPowerException(recipient) + return Message(phase=self.current_short_phase, sender=self.role, recipient=recipient, message=body) + + def new_global_message(self, body): + """ Create an undated (without timestamp) global message to be sent from a power via server. + Server will answer with timestamp, and message will be updated and added to local game messages. + :param body: message body (string). + :return: a new GameMessage object. + :rtype: Message + """ + assert self.is_player_game() + return Message(phase=self.current_short_phase, sender=self.role, recipient=GLOBAL, message=body) + + def add_message(self, message): + """ Add message to current game data. + Only a server game can add a message with no timestamp: + game will auto-generate a timestamp for the message. + :param message: a GameMessage object to add. + :return: message timestamp. + :rtype: int + """ + assert isinstance(message, Message) + if self.is_player_game(): + assert message.is_global() or self.power.name in (message.sender, message.recipient) + + if message.time_sent is None: + # This instance must be a server game. + # Message should be a new message matching current game phase. + # There should not be any more recent message in message history (as we are adding a new message). + # We must generate a timestamp for this message. + assert self.is_server_game() + if message.phase != self.current_short_phase: + raise exceptions.GamePhaseException(self.current_short_phase, message.phase) + assert not self.messages or common.timestamp_microseconds() >= self.messages.last_key() + time.sleep(1e-6) + message.time_sent = common.timestamp_microseconds() + + self.messages.put(message.time_sent, message) + return message.time_sent + + # Vote methods. For server and omniscient games only. + # Observer game should not see votes. + # Power game should know only vote of related power (votes for all other power should be 'neutral' in a power game). + + def has_draw_vote(self): + """ Return True if all controlled non-eliminated powers have voted YES to draw game at current phase. """ + assert self.is_server_game() or self.is_omniscient_game() + return all( + power.vote == strings.YES + for power in self.powers.values() + if not power.is_eliminated() + ) + + def count_voted(self): + """ Return the count of controlled powers who already voted for a draw for current phase. """ + assert self.is_server_game() or self.is_omniscient_game() + return sum(1 for power in self.powers.values() + if not power.is_eliminated() and power.vote != strings.NEUTRAL) + + def clear_vote(self): + """ Clear current vote. """ + for power in self.powers.values(): + power.vote = strings.NEUTRAL + + # ============== + # Basic methods. + # ============== + def get_units(self, power_name=None): + """ Retrieves the list of units for a power or for all powers + :param power_name: Optional. The name of the power (e.g. 'FRANCE') or None for all powers + :return: A list of units (e.g. ['A PAR', 'A MAR']) if a power name is provided + or a dictionary of powers with their units if None is provided (e.g. {'FRANCE': [...], ...} + + Note: Dislodged units will appear with a leading asterisk (e.g. '*A PAR') + """ + if power_name is not None: + power_name = power_name.upper() + power = self.get_power(power_name) + if power_name is not None: + return power.units[:] + ['*{}'.format(unit) for unit in power.retreats] + if power_name is None: + units = {} + for power in self.powers.values(): + units[power.name] = self.get_units(power.name) + return units + return [] + + def get_centers(self, power_name=None): + """ Retrieves the list of owned supply centers for a power or for all powers + :param power_name: Optional. The name of the power (e.g. 'FRANCE') or None for all powers + :return: A list of supply centers (e.g. ['PAR', 'MAR']) if a power name is provided + or a dictionary of powers with their supply centers if None is provided + (e.g. {'FRANCE': [...], ...} + """ + if power_name is not None: + power_name = power_name.upper() + power = self.get_power(power_name) + if power_name is not None: + return power.centers[:] + if power_name is None: + centers = {} + for power in self.powers.values(): + centers[power.name] = self.get_centers(power.name) + return centers + return [] + + def get_orders(self, power_name=None): + """ Retrieves the orders submitted by a specific power, or by all powers + :param power_name: Optional. The name of the power (e.g. 'FRANCE') or None for all powers + :return: A list of orders (e.g. ['A PAR H', 'A MAR - BUR']) if a power name is provided + or a dictionary of powers with their orders if None is provided + (e.g. {'FRANCE': ['A PAR H', 'A MAR - BUR', ...], ...} + """ + if power_name is not None: + power_name = power_name.upper() + power = self.get_power(power_name) + + # Getting orders for a particular power + # Skipping VOID and WAIVE orders in Adjustment/Retreats phase + if power_name is not None: + if self.get_current_phase()[-1] == 'M': + if 'NO_CHECK' in self.rules: + power_orders = [power.orders[order] for order in power.orders if power.orders[order]] + else: + power_orders = ['{} {}'.format(unit, unit_order) for unit, unit_order in power.orders.items()] + else: + power_orders = [order for order in power.adjust + if order and order != 'WAIVE' and order.split()[0] != 'VOID'] + return power_orders + + # Recursively calling itself to get all powers + elif power_name is None: + orders = {} + for power in self.powers.values(): + orders[power.name] = self.get_orders(power.name) + return orders + return [] + + def get_orderable_locations(self, power_name=None): + """ Find the location requiring an order for a power (or for all powers) + :param power_name: Optionally, the name of the power (e.g. 'FRANCE') or None for all powers + :return: A list of orderable locations (e.g. ['PAR', 'MAR']) if a power name is provided + or a dictionary of powers with their orderable locations if None is not provided + (e.g. {'FRANCE': [...], ...} + """ + if power_name is not None: + power_name = power_name.upper() + power = self.get_power(power_name) + + # Single power + if power_name is not None: + current_phase_type = self.get_current_phase()[-1] + + # Adjustment + if current_phase_type == 'A': + build_count = len(power.centers) - len(power.units) + + # Building - All unoccupied homes + if build_count > 0: + orderable_locs = self._build_sites(power) + + # Nothing can be built. + elif build_count == 0: + orderable_locs = [] + + # Disbanding - All units location + else: + orderable_locs = [unit[2:5] for unit in power.units] + + # Retreating + elif current_phase_type == 'R': + orderable_locs = [unit[2:5] for unit in power.retreats] + + # Movement + else: + orderable_locs = [unit[2:5] for unit in power.units] + + # Returning and sorting for deterministic output + return sorted(orderable_locs) + + # All powers + else: + return {power.name: self.get_orderable_locations(power.name) for power in self.powers.values()} + + def get_order_status(self, power_name=None, unit=None): + """ Returns a list or a dict representing the order status ('', 'no convoy', 'bounce', 'void', 'cut', + 'dislodged', 'disrupted') for orders submitted in the last phase + :param power_name: Optional. If provided (e.g. 'FRANCE') will only return the order status of that + power's orders + :param unit: Optional. If provided (e.g. 'A PAR') will only return that specific unit order status. + :param phase_type: Optional. Returns the results of a specific phase type (e.g. 'M', 'R', or 'A') + :return: If unit is provided a list (e.g. [] or ['void', 'dislodged']) + If power is provided a dict (e.g. {'A PAR': ['void'], 'A MAR': []}) + Otherwise a 2-level dict (e.g. {'FRANCE: {'A PAR': ['void'], 'A MAR': []}, 'ENGLAND': {}, ... } + """ + # Specific location, returning string + if unit is not None: + result_dict = self.result_history.last_value() if self.result_history else {} + return result_dict[unit][:] if unit in result_dict else [] + + # Specific power, returning dictionary + if power_name is not None: + power_name = power_name.upper() + if power_name is not None: + order_status = {} + if self.state_history: + state_history = self.state_history.last_value() + for ordered_unit in state_history['units'][power_name]: + ordered_unit = ordered_unit.replace('*', '') + order_status[ordered_unit] = self.get_order_status(power_name, ordered_unit) + return order_status + + # All powers + if power_name is None: + order_status = {} + for power in self.powers.values(): + order_status[power.name] = self.get_order_status(power.name) + return order_status + return {} + + def get_power(self, power_name): + """ Retrieves a power instance from given power name. + :param power_name: name of power instance to retrieve. Power name must be as given + in map file. + :return: the power instance, or None if power name is not found. + :rtype: Power + """ + return self.powers.get(power_name, None) + + def set_units(self, power_name, units, reset=False): + """ Sets units directly on the map + :param power_name: The name of the power who will own the units (e.g. 'FRANCE') + :param units: An unit (e.g. 'A PAR') or a list of units (e.g. ['A PAR', 'A MAR']) to set + Note units starting with a '*' will be set as dislodged + :param reset: Boolean. If, clear all units of the power before setting them + :return: Nothing + """ + power_name = power_name.upper() + if not isinstance(units, list): + units = [units] + if power_name not in self.powers: + return + + # Clearing old units if reset is true + if reset and power_name in self.powers: + self.powers[power_name].clear_units() + + regular_units = [unit for unit in units if unit[0] != '*'] + dislodged_units = [unit[1:] for unit in units if unit[0] == '*'] + influence = [unit[2:5] for unit in regular_units + dislodged_units] + + # Removing units that are already there + for power in self.powers.values(): + for unit in regular_units: + unit_loc = unit[2:5] + for unit_to_remove in [p_unit for p_unit in power.units if p_unit[2:5] == unit_loc]: + self.update_hash(power.name, unit_type=unit_to_remove[0], loc=unit_to_remove[2:]) + power.units.remove(unit_to_remove) + for unit in dislodged_units: + unit_loc = unit[2:5] + for unit_to_remove in [p_unit for p_unit in power.retreats if p_unit[2:5] == unit_loc]: + self.update_hash(power.name, unit_type=unit_to_remove[0], loc=unit_to_remove[2:], is_dislodged=True) + del power.retreats[unit_to_remove] + for loc in influence: + if loc in power.influence: + power.influence.remove(loc) + + # Retrieving the target power + power = self.get_power(power_name) + + # Re-adding normal units to the new power + for unit in regular_units: + word = unit.upper().split() + if len(word) != 2: + continue + unit_type, unit_loc = word + if unit_type in ('A', 'F') \ + and unit_loc in [loc.upper() for loc in self.map.locs] \ + and self.map.is_valid_unit(unit): + if power: + self.update_hash(power_name, unit_type=unit_type, loc=unit_loc) + power.units.append(unit) + power.influence.append(unit[2:5]) + else: + self.error += [err.MAP_INVALID_UNIT % unit] + + # Re-adding dislodged units to the new power + for unit in dislodged_units: + word = unit.upper().split() + if len(word) != 2: + continue + unit_type, unit_loc = word + if unit_type in ('A', 'F') and unit_loc in [loc.upper() for loc in self.map.locs]: + abuts = [abut.upper() for abut in self.map.abut_list(unit_loc, incl_no_coast=True) + if self._abuts(unit_type, unit_loc, '-', abut.upper())] + if power: + self.update_hash(power_name, unit_type=unit_type, loc=unit_loc, is_dislodged=True) + power.retreats[unit] = abuts + + # Clearing cache + self.clear_cache() + + def set_centers(self, power_name, centers, reset=False): + """ Transfers supply centers ownership + :param power_name: The name of the power who will control the supply centers (e.g. 'FRANCE') + :param centers: A loc (e.g. 'PAR') or a list of locations (e.g. ['PAR', 'MAR']) to transfer + :param reset: Boolean. If, removes ownership of all power's SC before transferring ownership of the new SC + :return: Nothing + """ + power_name = power_name.upper() + if not isinstance(centers, list): + centers = [centers] + if power_name not in self.powers: + return + + # Clearing old centers if reset is true + if reset and power_name in self.powers: + self.powers[power_name].clear_centers() + + # Removing centers that are already controlled by another power + for power in self.powers.values(): + for center in centers: + if center in power.centers: + self.update_hash(power.name, loc=center, is_center=True) + power.centers.remove(center) + + # Transferring center to power_name + power = self.get_power(power_name) + if power: + for center in centers: + if center in self.map.scs: + self.update_hash(power_name, loc=center, is_center=True) + power.centers += [center] + + # Clearing cache + self.clear_cache() + + def set_orders(self, power_name, orders, expand=True, replace=True): + """ Sets the current orders for a power + :param power_name: The name of the power (e.g. 'FRANCE') + :param orders: The list of orders (e.g. ['A MAR - PAR', 'A PAR - BER', ...]) + :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.) + If false, expect orders in the following format. False gives a performance improvement. + :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders. + :return: Nothing + + Expected format: + A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI + A IRO R MAO, A IRO D, A LON B, F LIV B + """ + power_name = power_name.upper() + + if not self.has_power(power_name): + return + + if self.is_player_game() and self.role != power_name: + return + + power = self.get_power(power_name) + if power: + if not isinstance(orders, list): + orders = [orders] + + # Remove any empty string from orders. + orders = [order for order in orders if order] + + # Setting orders depending on phase type + if self.phase_type == 'R': + self._update_retreat_orders(power, orders, expand=expand, replace=replace) + elif self.phase_type == 'A': + self._update_adjust_orders(power, orders, expand=expand, replace=replace) + else: + self._update_orders(power, orders, expand=expand, replace=replace) + power.order_is_set = (OrderSettings.ORDER_SET + if self.get_orders(power.name) + else OrderSettings.ORDER_SET_EMPTY) + + def set_wait(self, power_name, wait): + """ Set wait flag for a power. + :param power_name: name of power to set wait flag. + :param wait: wait flag (boolean). + """ + power_name = power_name.upper() + + if not self.has_power(power_name): + return + + power = self.get_power(power_name.upper()) # type: Power + power.wait = wait + + def clear_units(self, power_name=None): + """ Clear the power's units + :param power_name: Optional. The name of the power whose units will be cleared (e.g. 'FRANCE'), + otherwise all units on the map will be cleared + :return: Nothing + """ + for power in self.powers.values(): + if power_name is None or power.name == power_name: + power.clear_units() + self.clear_cache() + + def clear_centers(self, power_name=None): + """ Removes ownership of supply centers + :param power_name: Optional. The name of the power whose centers will be cleared (e.g. 'FRANCE'), + otherwise all centers on the map will lose ownership. + :return: Nothing + """ + for power in self.powers.values(): + if power_name is None or power.name == power_name: + power.clear_centers() + self.clear_cache() + + def clear_orders(self, power_name=None): + """ Clears the power's orders + :param power_name: Optional. The name of the power to clear (e.g. 'FRANCE') or will clear orders for + all powers if None. + :return: Nothing + """ + if power_name is not None: + power = self.get_power(power_name.upper()) + power.clear_orders() + else: + for power in self.powers.values(): + power.clear_orders() + + def clear_cache(self): + """ Clears all caches """ + self.convoy_paths_possible, self.convoy_paths_dest = None, None + + def get_current_phase(self): + """ Returns the current phase (format 'S1901M' or 'FORMING' or 'COMPLETED' """ + return self._phase_abbr() + + def set_current_phase(self, new_phase): + """ Changes the phase to the specified new phase (e.g. 'S1901M') """ + if new_phase in ('FORMING', 'COMPLETED'): + self.phase = new_phase + self.phase_type = None + else: + self.phase = self.map.phase_long(new_phase) + self.phase_type = self.phase.split()[-1][0] + + def render(self, incl_orders=True, incl_abbrev=False, output_format='svg'): + """ Renders the current game and returns its image representation + :param incl_orders: Optional. Flag to indicate we also want to render orders. + :param incl_abbrev: Optional. Flag to indicate we also want to display the provinces abbreviations. + :param output_format: The desired output format. + :return: The rendered image in the specified format. + """ + if not self.renderer: + self.renderer = Renderer(self) + return self.renderer.render(incl_orders=incl_orders, incl_abbrev=incl_abbrev, output_format=output_format) + + def add_rule(self, rule): + """ Adds a rule to the current rule list + :param rule: Name of rule to add (e.g. 'NO_PRESS') + :return: Nothing + """ + if not self.__class__.rule_cache: + self._load_rules() + valid_rules = {valid_rule for valid_rule in self.__class__.rule_cache[0]} + + if rule not in valid_rules or rule in self.no_rules: + return + + forbidden_rules = self.__class__.rule_cache[0].get(rule, {}).get('!', []) + rules_to_add = self.__class__.rule_cache[0].get(rule, {}).get('+', []) + rules_to_remove = self.__class__.rule_cache[0].get(rule, {}).get('-', []) + + # Making sure we don't already have a forbidden rule + for forbidden in forbidden_rules: + if forbidden in self.rules: + self.error += [err.GAME_FORBIDDEN_RULE % (forbidden, rule)] + return + if forbidden not in self.no_rules: + self.no_rules.add(forbidden) + + # Adding rules + for rule_to_add in rules_to_add: + if rule_to_add not in self.rules: + self.rules.append(rule_to_add) + + # Removing rules + for rule_to_remove in rules_to_remove: + if rule_to_remove in self.rules: + self.rules.remove(rule_to_remove) + + # Adding main rule + if rule not in self.rules: + self.rules.append(rule) + + def remove_rule(self, rule): + """ Removes a rule from the current rule list + :param rule: Name of rule to remove (e.g. 'NO_PRESS') + :return: Nothing + """ + if rule in self.rules: + self.rules.remove(rule) + + def load_map(self, reinit_powers=True): + """ Load a map and process directives + :param reinit_powers: Boolean. If true, empty powers dict. + :return: Nothing, but stores the map in self.map + """ + # Create a map, and check for errors + self.map = Map(self.map_name) + if self.map_name != self.map.name: + raise RuntimeError('Invalid Map loaded. Expected %s - Got %s' % (self.map_name, self.map.name)) + + # Adding map rules + for rule in self.map.rules: + self.add_rule(rule) + + # Build Zobrist tables + self._build_hash_table() + + self.error += self.map.error + + # Sets the current phase to the long version + if self.phase and ' ' not in self.phase and self.phase not in ('FORMING', 'COMPLETED'): + self.phase = self.map.phase_long(self.phase) + + # Have the Game process all lines in the map file that were in DIRECTIVES clauses (this includes any RULE lines) + # Do this for all directives given without a variant and for those specific for this Game's variant. + if self.phase == 'FORMING': + return + + # Resetting powers + if reinit_powers: + self.powers = {} + + def process(self): + """ Processes the current phase of the game. + :return: game phase data with data before processing. + """ + previous_phase = self._phase_wrapper_type(self.current_short_phase) + previous_orders = self.get_orders() + previous_messages = self.messages.copy() + previous_state = self.get_state() + + if self.error: + if 'IGNORE_ERRORS' not in self.rules: + print('The following errors were encountered and were cleared before processing.') + for error in self.error: + print('-- %s' % error) + print('-' * 32) + self.error = [] + self._process() + + # result_history should have been updated with orders results for processed (previous) phase. + + self.clear_vote() + self.clear_orders() + self.messages.clear() + self.order_history.put(previous_phase, previous_orders) + self.message_history.put(previous_phase, previous_messages) + self.state_history.put(previous_phase, previous_state) + return GamePhaseData(name=str(previous_phase), + state=previous_state, + orders=previous_orders, + messages=previous_messages, + results=self.result_history[previous_phase]) + + def rebuild_hash(self): + """ Completely recalculate the Zobrist hash + :return: The updated hash value + """ + self.zobrist_hash = 0 + if self.map is None: + return 0 + + # Recalculating for each power + for power in self.powers.values(): + for unit in power.units: + self.update_hash(power.name, unit_type=unit[0], loc=unit[2:]) + for dis_unit in power.retreats: + self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True) + for center in power.centers: + self.update_hash(power.name, loc=center, is_center=True) + for home in power.homes: + self.update_hash(power.name, loc=home, is_home=True) + + # Clearing cache + self.clear_cache() + + # Returning the new hash + return self.get_hash() + + def get_hash(self): + """ Returns the zobrist hash for the current game """ + # Needs to be a string, otherwise json.dumps overflows + return str(self.zobrist_hash) + + def update_hash(self, power, unit_type='', loc='', is_dislodged=False, is_center=False, is_home=False): + """ Updates the zobrist hash for the current game + :param power: The name of the power owning the unit, supply center or home + :param unit_type: Contains the unit type of the unit being added or remove from the board ('A' or 'F') + :param loc: Contains the location of the unit, supply center, of home being added or remove + :param is_dislodged: Indicates that the unit being added/removed is dislodged + :param is_center: Indicates that the location being added/removed is a supply center + :param is_home: Indicates that the location being added/removed is a home + :return: Nothing + """ + if self.map is None: + return + zobrist = self.__class__.zobrist_tables[self.map_name] + loc = loc[:3].upper() if is_center or is_home else loc.upper() + power = power.upper() + + power_ix = zobrist['map_powers'].index(power) + loc_ix = zobrist['map_locs'].index(loc) + unit_type_ix = ['A', 'F'].index(unit_type) if unit_type in ['A', 'F'] else -1 + + # Dislodged + if is_dislodged: + self.zobrist_hash ^= zobrist['dis_unit_type'][unit_type_ix, loc_ix] + self.zobrist_hash ^= zobrist['dis_units'][power_ix, loc_ix] + + # Supply Center + elif is_center: + self.zobrist_hash ^= zobrist['centers'][power_ix, loc_ix] + + # Home + elif is_home: + self.zobrist_hash ^= zobrist['homes'][power_ix, loc_ix] + + # Regular unit + else: + self.zobrist_hash ^= zobrist['unit_type'][unit_type_ix, loc_ix] + self.zobrist_hash ^= zobrist['units'][power_ix, loc_ix] + + def get_phase_data(self): + """ Return a GamePhaseData object representing current game. """ + # Associate each power name to power orders, or None if order ist not set for the power. + # This is done to make distinction between voluntary empty orders ([]) and unset orders (None). + current_orders = {power.name: (self.get_orders(power.name) if power.order_is_set else None) + for power in self.powers.values()} + # Game does not have results for current orders (until orders are processed and game phase is updated). + return GamePhaseData(name=self.current_short_phase, + state=self.get_state(), + orders=current_orders, + messages=self.messages.copy(), + results={}) + + def set_phase_data(self, phase_data, clear_history=True): + """ Set game from phase data. + :param phase_data: either a GamePhaseData or a list of GamePhaseData. + If phase_data is a GamePhaseData, it will be treated as a list of GamePhaseData with 1 element. + Last phase data in given list will be used to set current game internal state. + Previous phase data in given list will replace current game history. + :param clear_history: Indicate if we must clear game history fields before update. + """ + if not phase_data: + return + if isinstance(phase_data, GamePhaseData): + phase_data = [phase_data] + elif not isinstance(phase_data, list): + phase_data = list(phase_data) + + if clear_history: + self._clear_history() + + for game_phase_data in phase_data[:-1]: # type: GamePhaseData + self.extend_phase_history(game_phase_data) + + current_phase_data = phase_data[-1] # type: GamePhaseData + self.set_state(current_phase_data.state, clear_history=False) + for power_name, power_orders in current_phase_data.orders.items(): + if power_orders is not None: + self.set_orders(power_name, power_orders) + self.messages = current_phase_data.messages.copy() + # We ignore 'results' for current phase data. + + def get_state(self): + """ Gets the internal saved state of the game. + This state is intended to represent current game view + (powers states, orders results for previous phase, and few more info). + See field message_history to get messages from previous phases. + See field order_history to get orders from previous phases. + To get a complete state of all data in this game object, consider using method Game.to_dict(). + + :param make_copy: Boolean. If true, a deep copy of the game state is returned, otherwise the attributes are + returned directly. + :return: The internal saved state (dict) of the game + """ + state = {} + state['timestamp'] = common.timestamp_microseconds() + state['zobrist_hash'] = self.get_hash() + state['note'] = self.note + state['name'] = self._phase_abbr() + state['units'] = {} + state['centers'] = {} + state['homes'] = {} + state['influence'] = {} + state['civil_disorder'] = {} + state['builds'] = {} + + # Setting powers data: units, centers, homes, influence and civil disorder. + for power in self.powers.values(): + state['units'][power.name] = list(power.units) + ['*{}'.format(d) for d in power.retreats] + state['centers'][power.name] = list(power.centers) + state['homes'][power.name] = list(power.homes) + state['influence'][power.name] = list(power.influence) + state['civil_disorder'][power.name] = power.civil_disorder + # Setting build + state['builds'][power.name] = {} + if self.phase_type != 'A': + state['builds'][power.name]['count'] = 0 + else: + state['builds'][power.name]['count'] = len(power.centers) - len(power.units) + state['builds'][power.name]['homes'] = [] + if state['builds'][power.name].get('count', 0) > 0: + build_sites = self._build_sites(power) + state['builds'][power.name]['count'] = min(len(build_sites), state['builds'][power.name]['count']) + state['builds'][power.name]['homes'] = build_sites + + # Returning state + return state + + def set_state(self, state, clear_history=True): + """ Sets the game from a saved internal state + :param state: The saved state (dict) + :param clear_history: Boolean. If true, all game histories are cleared. + :return: Nothing + """ + if clear_history: + self._clear_history() + + if 'map' in state and self.map.name != state['map']: + raise RuntimeError('Inconsistent state map (state: %s, game: %s)' % (state['map'], self.map.name)) + if 'rules' in state: + self.rules = [] + for rule in state['rules']: + self.add_rule(rule) + + if 'note' in state: + self.note = state['note'] + if 'name' in state and state['name']: + self.set_current_phase(state['name']) + if 'units' in state: + for power_name, units in state['units'].items(): + self.set_units(power_name, units, reset=True) + if 'centers' in state: + for power_name, centers in state['centers'].items(): + self.set_centers(power_name, centers, reset=True) + for power in self.powers.values(): + if 'homes' in state and power.name in state['homes']: + power.homes = list(state['homes'][power.name]) + else: + power.homes = list(self.map.homes[power.name]) + if 'influence' in state: + for power_name, influence in state['influence'].items(): + power = self.get_power(power_name) + power.influence = deepcopy(influence) + if 'civil_disorder' in state: + for power_name, civil_disorder in state['civil_disorder'].items(): + power = self.get_power(power_name) + power.civil_disorder = civil_disorder + + # Rebuilding hash and returning + self.rebuild_hash() + self._build_list_possible_convoys() + + def get_all_possible_orders(self, loc=None): + """ Computes a list of all possible orders for a unit in a given location + :param loc: Optional. The location where to get a list of orders (must include coasts) + If not provided, returns a list of all possible orders for all locations + :return: A list of orders for the unit, if there is a unit at location, or a list of possible + orders for all locations if no locations are provided. + """ + # pylint: disable=too-many-branches + # No locations, building a dict with all locations + if not loc: + all_possible_orders = {} + for map_loc in self.map.locs: + map_loc = map_loc.upper() + all_possible_orders[map_loc] = self.get_all_possible_orders(map_loc) + return all_possible_orders + + def remove_duplicates(list_with_dup): + """ Shorthand functions to remove duplicates """ + seen = set() + return [item for item in list_with_dup if not (item in seen or seen.add(item))] + + # Otherwise finding the possible orders at that specific location + possible_orders = [] + is_dislodged = False + unit = None + unit_power = None + + # If there are coasts possible, recursively adding the coasts first, then adding the loc orders + if '/' not in loc: + for loc_with_coast in [coast for coast in self.map.find_coasts(loc) if '/' in coast]: + possible_orders += self.get_all_possible_orders(loc_with_coast) + + # Determining if there is a unit at loc + # Dislodged unit have precedence over regular unit in Retreat phase + for power in self.powers.values(): + dislodged = [u for u in power.retreats if u[2:] == loc.upper()] + regular = [u for u in power.units if u[2:] == loc.upper()] + if dislodged: + is_dislodged = True + unit = dislodged[0] + unit_power = power + break + elif regular and not is_dislodged: + unit = regular[0] + unit_power = power + if self.phase_type != 'R': + break + + # No unit found, checking if location is a home + if unit is None: + if self.phase_type != 'A': + return remove_duplicates(possible_orders) + for power in self.powers.values(): + if loc[:3] in power.homes and 'BUILD_ANY' not in self.rules: + unit_power = power + break + if loc[:3] in power.centers and 'BUILD_ANY' in self.rules: + unit_power = power + break + + # Not a home, and no units + if not unit_power: + return remove_duplicates(possible_orders) + + # Determining if we can build or need to remove units + build_count = 0 if self.phase_type != 'A' else len(unit_power.centers) - len(unit_power.units) + + # Determining unit type and unit location + unit_type = unit[0] if unit else '' + unit_loc = unit[2:] if unit else '' + + # Movement phase + if self.phase_type == 'M': + # Computing coasts for dest + dest_1_hops = [l.upper() for l in self.map.abut_list(unit_loc, incl_no_coast=True)] + dest_with_coasts = [self.map.find_coasts(dest) for dest in dest_1_hops] + dest_with_coasts = {val for sublist in dest_with_coasts for val in sublist} + + # Hold + possible_orders += ['{} H'.format(unit)] + + # Move (Regular) and Support (Hold) + for dest in dest_with_coasts: + if self._abuts(unit_type, unit_loc, '-', dest): + possible_orders += ['{} - {}'.format(unit, dest)] + if self._abuts(unit_type, unit_loc, 'S', dest): + if self._unit_owner('A {}'.format(dest)): + possible_orders += ['{} S A {}'.format(unit, dest)] + elif self._unit_owner('F {}'.format(dest)): + possible_orders += ['{} S F {}'.format(unit, dest)] + + # Move Via Convoy + for dest in self._get_convoy_destinations(unit_type, unit_loc): + possible_orders += ['{} - {} VIA'.format(unit, dest)] + + # Support (Move) + for dest in dest_with_coasts: + + # Computing src of move (both from adjacent provinces and possible convoys) + # We can't support a unit that needs us to convoy it to its destination + abut_srcs = self.map.abut_list(dest, incl_no_coast=True) + convoy_srcs = self._get_convoy_destinations('A', dest, exclude_convoy_locs=[unit_loc]) + + # Computing coasts for source + src_with_coasts = [self.map.find_coasts(src) for src in abut_srcs + convoy_srcs] + src_with_coasts = {val for sublist in src_with_coasts for val in sublist} + + for src in src_with_coasts: + + # Checking if there is a unit on the src location + if self._unit_owner('A {}'.format(src)): + src_unit_type = 'A' + elif self._unit_owner('F {}'.format(src)): + src_unit_type = 'F' + else: + continue + + # Checking if src unit can move to dest (through adj or convoy), and that we can support it + # Only armies can move through convoy + if src[:3] != unit_loc[:3] \ + and self._abuts(unit_type, unit_loc, 'S', dest) \ + and ((src in convoy_srcs and src_unit_type == 'A') + or self._abuts(src_unit_type, src, '-', dest)): + + # Adding with coast + possible_orders += ['{} S {} {} - {}'.format(unit, src_unit_type, src, dest)] + + # Adding without coasts + if '/' in dest: + possible_orders += ['{} S {} {} - {}'.format(unit, src_unit_type, src, dest[:3])] + + # Convoy + if unit_type == 'F': + convoy_srcs = self._get_convoy_destinations(unit_type, unit_loc, unit_is_convoyer=True) + for src in convoy_srcs: + + # Checking if there is a unit on the src location + if unit_type == 'F' and self._unit_owner('A {}'.format(src)): + src_unit_type = 'A' + else: + continue + + # Checking where the src unit can actually go + convoy_dests = self._get_convoy_destinations(src_unit_type, src, unit_is_convoyer=False) + + # Adding them as possible moves + for dest in convoy_dests: + if self._has_convoy_path(src_unit_type, src, dest, convoying_loc=unit_loc): + possible_orders += ['{} C {} {} - {}'.format(unit, src_unit_type, src, dest)] + + # Retreat phase + if self.phase_type == 'R': + + # Disband + if is_dislodged: + possible_orders += ['{} D'.format(unit)] + + # Retreat + if is_dislodged: + retreat_locs = unit_power.retreats[unit] + for dest in retreat_locs: + dest = dest.upper() + if not self._unit_owner('A {}'.format(dest[:3]), coast_required=0) \ + and not self._unit_owner('F {}'.format(dest[:3]), coast_required=0): + possible_orders += ['{} R {}'.format(unit, dest)] + + # Adjustment Phase + if self.phase_type == 'A': + build_sites = self._build_sites(unit_power) + + # Disband + if build_count < 0 and unit: + possible_orders += ['{} D'.format(unit)] + + # Build Army / Fleet + if build_count > 0 \ + and loc[:3] in build_sites \ + and not self._unit_owner('A ' + loc[:3], coast_required=0) \ + and not self._unit_owner('F ' + loc[:3], coast_required=0): + if self.map.is_valid_unit('A {}'.format(loc)): + possible_orders += ['A {} B'.format(loc)] + if self.map.is_valid_unit('F {}'.format(loc)): + possible_orders += ['F {} B'.format(loc)] + + # Waive + if build_count > 0: + possible_orders += ['WAIVE'] + + # Removing duplicate + return remove_duplicates(possible_orders) + + # ==================================================================== + # Private Interface - CONVOYS Methods + # ==================================================================== + def _build_list_possible_convoys(self): + """ Regenerates the list of possible convoy paths given the current fleet locations """ + # Already generated + if self.convoy_paths_possible is not None: + return + self.convoy_paths_possible = [] + self.convoy_paths_dest = {} + + # Finding fleets on water + convoying_locs = [] + for power in self.powers.values(): + for unit in power.units: + if unit[0] == 'F' and self.map.area_type(unit[2:]) in ['WATER', 'PORT']: + convoying_locs += [unit[2:]] + convoying_locs = set(convoying_locs) + + # Finding all possible convoy paths + for nb_fleets in range(1, len(convoying_locs) + 1): + for start, fleets, dests in self.map.convoy_paths[nb_fleets]: + if fleets.issubset(convoying_locs): + self.convoy_paths_possible += [(start, fleets, dests)] + + # Marking path to dest + self.convoy_paths_dest.setdefault(start, {}) + for dest in dests: + self.convoy_paths_dest[start].setdefault(dest, []) + self.convoy_paths_dest[start][dest] += [fleets] + + def _is_convoyer(self, army, loc): + """ Detects if there is a convoyer at thru location for army/fleet (e.g. can an army be convoyed through PAR) + :param army: Boolean to indicate if unit being convoyed is army (1) or fleet (0) + :param loc: Location we are checking (e.g. 'STP/SC') + :return: Boolean to indicate if unit can be convoyed through location + """ + # Armies can't convoy fleet, so if unit being convoyed is not an army, convoy not possible + if not army: + return False + + # Army can convoy through water, all units can convoy through port + area_type = self.map.area_type(loc) + area_type_cond = ((area_type == 'WATER') == army or area_type == 'PORT') + + # Making sure there is a valid unit on thru location to perform convoy + unit_type_cond = self._unit_owner('F %s' % loc, coast_required=0) + return area_type_cond and unit_type_cond + + def _is_moving_via_convoy(self, unit): + """ Determines if a unit is moving via a convoy or through land + :param unit: The name of the unit (e.g. 'A PAR') + :return: A boolean (True, False) to indicate if the unit is moving via convoy + """ + # Not moving or no paths + if unit not in self.command or self.command[unit][0] != '-': + return False + if unit not in self.convoy_paths or not self.convoy_paths[unit]: + return False + + # Otherwise, convoying since there is still an active valid path + return True + + def _has_convoy_path(self, unit, start, end, convoying_loc=None): + """ Determines if there is a convoy path for unit + :param unit: The unit BEING convoyed (e.g. 'A' or 'F') + :param start: The start location of the unit (e.g. 'LON') + :param end: The destination of the unit (e.g. 'MAR') + :param convoying_loc: Optional. If set, the convoying location must be in one of the paths + :return: A boolean flag to indicate if the convoy is possible (if all units cooperate) + """ + if unit != 'A': + return False + + # Checking in table if there is a valid path and optionally if the convoying loc is in the path + self._build_list_possible_convoys() + active_paths = self.convoy_paths_dest.get(start, {}).get(end, []) + return active_paths and (convoying_loc is None or [1 for path in active_paths if convoying_loc in path]) + + def _get_convoying_units_for_path(self, unit, start, end): + """ Returns a list of units who have submitted orders to convoy 'unit' from 'start' to 'end' + :param unit: The unit BEING convoyed (e.g. 'A' or 'F') + :param start: The start location of the unit (e.g. 'LON') + :param end: The destination of the unit (e.g. 'MAR') + :return: A list of convoying units (e.g. ['F NAO', 'F MAO']) having current orders to convoy path + """ + convoying_units = [] + army = unit != 'F' + expected_order = 'C %s %s - %s' % (unit, start[:3], end[:3]) + for unit_loc, unit_order in list(self.command.items()): + if unit_order == expected_order and self._is_convoyer(army, unit_loc[2:]): + convoying_units += [unit_loc] + return convoying_units + + def _get_convoy_destinations(self, unit, start, unit_is_convoyer=False, exclude_convoy_locs=None): + """ Returns a list of possible convoy destinations for a unit + :param unit: The unit BEING convoyed (e.g. 'A' or 'F') + :param start: The start location of the unit (e.g. 'LON') + :param unit_is_convoyer: Boolean flag. If true, list all the dests that an unit being convoyed by unit + could reach + :param exclude_convoy_locs: Optional. A list of convoying location that needs to be excluded from all paths. + :return: A list of convoying destinations (e.g. ['PAR', 'MAR']) that can be reached from start + """ + if unit == 'A' and unit_is_convoyer: + return [] + if unit == 'F' and not unit_is_convoyer: + return [] + + # Building cache + self._build_list_possible_convoys() + + # If we are moving via convoy, we just read the destinations from the table + if not unit_is_convoyer: + if not exclude_convoy_locs: + return list(self.convoy_paths_dest.get(start, {}).keys()) + + # We need to loop to make sure there is a path without the excluded convoyer + dests = [] + for dest, paths in self.convoy_paths_dest.get(start, {}).items(): + for path in paths: + if not [1 for excluded_loc in exclude_convoy_locs if excluded_loc in path]: + dests += [dest] + break + return dests + + # If we are convoying, we need to loop through the possible convoy paths + valid_dests = set([]) + for _, fleets, dests in self.convoy_paths_possible: + if start in fleets and (exclude_convoy_locs is None + or not [1 for excluded_loc in exclude_convoy_locs if excluded_loc in fleets]): + valid_dests |= dests + return list(valid_dests) + + def _get_convoy_paths(self, unit_type, start, end, via, convoying_units): + """ Return a list of all possible convoy paths (using convoying units) from start to end + :param unit_type: The unit type BEING convoyed (e.g. 'A' or 'F') + :param start: The start location of the unit (e.g. 'LON') + :param end: The destination of the unit (e.g. 'MAR') + :param via: Boolean flag (0 or 1) to indicate if we want only paths with a local convoyer, or also paths + including only foreign convoyers + :param convoying_units: The list of units who can convoy the unit + :return: A list of paths from start to end using convoying_units + """ + if unit_type != 'A' or not convoying_units: + return [] + + # Building cache and finding possible paths with convoying units + # Adding start and end location to every path + self._build_list_possible_convoys() + fleets = {loc[2:] for loc in convoying_units} + paths = [path for path in self.convoy_paths_dest.get(start, {}).get(end, set([])) if path.issubset(fleets)] + paths = [[start] + list(path) + [end] for path in paths] + paths.sort(key=len) + + # No paths found + if not paths: + return [] + + # We have intent to convoy, so we can use all paths + if via: + return paths + + # Assuming intent if end is not reachable from start (i.e. a convoy is required) + if not self._abuts(unit_type, start, 'S', end): + return paths + + # Otherwise, detecting if we intended to convoy + unit_owner = self._unit_owner('%s %s' % (unit_type, start), coast_required=0) + for convoyer in convoying_units: + convoy_owner = self._unit_owner(convoyer, coast_required=1) + + # We have intent if one of the power's fleet issued a convoyed order + # and there was a path using that fleet to move from start to end + if unit_owner == convoy_owner and \ + self._has_convoy_path(unit_type, start, end, convoying_loc=convoyer[2:]): + return paths + + # We could not detect intent + return [] + + def _get_distance_to_home(self, unit_type, start, homes): + """ Calculate the distance from unit to one of its homes + Armies can move over water (4.D.8 choice d) + :param unit_type: The unit type to calculate distance (e.g. 'A' or 'F') + :param start: The start location of the unit (e.g. 'LON') + :param homes: The list of homes (first one reached calculates the distance) + :return: The minimum distance from unit to one of the homes + """ + visited = [] + if not homes: + return 99999 + + # Modified Djikstra + to_check = PriorityDict() + to_check[start] = 0 + while to_check: + distance, current = to_check.smallest() + del to_check[current] + + # Found smallest distance + if current[:3] in homes: + return distance + + # Marking visited + if current in visited: + continue + visited += [current] + + # Finding neighbors and updating distance + for loc in self.map.abut_list(current, incl_no_coast=True): + loc = loc.upper() + if loc in visited: + continue + + # Calculating distance for armies over LAND/WATER/COAST and for Fleet over WATER/COAST + if unit_type == 'A' or self._abuts(unit_type, current, '-', loc): + loc_distance = to_check[loc] if loc in to_check else 99999 + to_check[loc] = min(distance + 1, loc_distance) + + # Could not find destination + return 99999 + + # ==================================================================== + # Private Interface - ORDER Validation Methods + # ==================================================================== + def _valid_order(self, power, unit, order, report=1): + """ Determines if an order is valid + :param power: The power submitting the order + :param unit: The unit being affected by the order (e.g. 'A PAR') + :param order: The actual order (e.g. 'H' or 'S A MAR') + :param report: Boolean to report errors in self.errors + :return: One of the following: + None - The order is NOT valid at all + -1 - It is NOT valid, BUT it does not get reported because it may be used to signal support + 0 - It is valid, BUT some unit mentioned does not exist + 1 - It is completed valid + """ + # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements + # No order + if not order: + return None + word = order.split() + owner = self._unit_owner(unit) + rules = self.rules + + # No order + if not word: + return None + + status = 1 if owner is not None else 0 + unit_type = unit[0] + unit_loc = unit[2:] + order_type = word[0] + + # Make sure the unit exists (or if the player is in a game in which he can't necessarily know) could exist. + # Also make sure any mentioned (supported or conveyed) unit could exists and could reach the listed destination + if not self.map.is_valid_unit(unit): + if report: + self.error.append(err.GAME_ORDER_TO_INVALID_UNIT % unit) + return None + + # Support / Convoy - 'S A/F XXX - YYY' + if order_type in ('S', 'C') and word[1:]: + if word[1] in ('A', 'F'): + alter, other = word[1:3] + else: + alter, other = '?', word[1] + + # Checks if A/F XXX is a valid unit for loc (source) + other = alter + ' ' + other + if not self.map.is_valid_unit(other, no_coast_ok=1): + if report: + self.error.append(err.GAME_ORDER_INCLUDES_INVALID_UNIT % other) + return None + + # S [A/F] XXX - YYY + # Checks if A/F YYY is a valid unit for loc (dest) + if len(word) == 5 - (alter == '?'): + other = alter + ' ' + word[-1] + if not self.map.is_valid_unit(other, no_coast_ok=1): + if report: + self.error.append(err.GAME_ORDER_INCLUDES_INVALID_DEST % other) + return None + + # Check if unit exists + # Status - 1 if unit has owner, 0 otherwise (Non-existent unit) + if not status: + if report: + self.error.append(err.GAME_ORDER_NON_EXISTENT_UNIT % unit) + return None + if power is not owner: + if report: + self.error.append(err.GAME_ORDER_TO_FOREIGN_UNIT % unit) + return None + + # Validate that anything in a SHUT location is only ordered to HOLD + if self.map.area_type(unit_loc) == 'SHUT' and order_type != 'H': + if report: + self.error.append(err.GAME_UNIT_MAY_ONLY_HOLD % unit) + return None + + # Validate support and convoy orders + # Triggers error if Army trying to convoys + if order_type == 'C' and (unit_type != 'F' or (self.map.area_type(unit_loc) not in ('WATER', 'PORT'))): + if report: + self.error.append(err.GAME_CONVOY_IMPROPER_UNIT % (unit, order)) + return None + + # ------------------------------------------------------------- + # SUPPORT OR CONVOY ORDER + if order_type in ('C', 'S'): + + # Add the unit type (or '?') if not specified. + # Note that the unit type is NOT added to the actual order -- just used during checking. + order_text = 'CONVOY' if order_type == 'C' else 'SUPPORT' + if len(word) > 1 and word[1] not in ('A', 'F'): + terrain = self.map.area_type(word[1]) + if order_type == 'C': + word[1:1] = ['AF'[unit_type == 'A']] # Convoying the opposite unit type A-F and F-A + elif terrain == 'WATER': + word[1:1] = ['F'] + elif terrain == 'LAND': + word[1:1] = ['A'] + elif terrain: # Other terrain, trying to determine if XXX exist + its_unit_type = [unit_type for unit_type in 'AF' if self._unit_owner(unit_type + ' ' + word[1])] + if its_unit_type: + word[1:1] = its_unit_type + else: + if report: + self.error.append(err.GAME_INVALID_ORDER_NON_EXISTENT_UNIT % (order_text, unit, order)) + return None + else: + if report: + self.error.append(err.GAME_INVALID_ORDER_RECIPIENT % (order_text, unit, order)) + return None + + # Make sure we have enough to work with + # e.g. syntax S A XXX - YYY or at least S XXX YYY + if len(word) < 3: + if report: + self.error.append(err.GAME_BAD_ORDER_SYNTAX % (order_text, unit, order)) + return None + + # Check that the recipient of the support or convoy exists + rcvr, dest = ' '.join(word[1:3]), word[2] + if not self._unit_owner(rcvr, 0): + if report: + self.error.append(err.GAME_ORDER_RECIPIENT_DOES_NOT_EXIST % (order_text, unit, order)) + return None + + # Check that the recipient is not the same unit as the supporter + if unit_loc == dest: + if report: + self.error.append(err.GAME_UNIT_CANT_SUPPORT_ITSELF % (unit, order)) + return None + + # Only units on coasts can be convoyed, or invalid units convoying + if order_type == 'C' \ + and (word[1] != 'AF'[unit_type == 'A'] or self.map.area_type(dest) not in ('COAST', 'PORT')): + if report: + self.error.append(err.GAME_UNIT_CANT_BE_CONVOYED % (unit, order)) + return None + + # Handle orders of the form C U xxx - xxx and S U xxx - xxx + if len(word) == 5: + if word[3] != '-': + if report: + self.error.append(err.GAME_BAD_ORDER_SYNTAX % (order_text, unit, order)) + return None + dest = word[4] + + # Coast is specified in the move, but ignored in the support and convoy order + # DATC 6.B.4 + if '/' in dest: + dest = dest[:dest.find('/')] + + # Making sure the dest is COAST,PORT and that the convoyed order can land there + if order_type == 'C': + if not (self.map.area_type(dest) in ('COAST', 'PORT') + and self.map.is_valid_unit(word[1] + ' ' + dest, unit[0] < 'F')): + if report: + self.error.append(err.GAME_BAD_CONVOY_DESTINATION % (unit, order)) + return None + + # Checking that support can reach destination... + elif (not self._abuts(word[1], word[2], order_type, dest) + and (rcvr[0] == 'F' + or not self._has_convoy_path(word[1], word[2][:3], dest))): + if report: + self.error.append(err.GAME_SUPPORTED_UNIT_CANT_REACH_DESTINATION % (unit, order)) + return None + + # Make sure that a convoy order was formatted as above + elif order_type == 'C': + if report: + self.error.append(err.GAME_IMPROPER_CONVOY_ORDER % (unit, order)) + return None + + # Make sure a support order was either as above or as S U xxx or as S U xxx H + elif len(word) != 3 and (len(word) != 4 or word[-1] != 'H'): + if report: + self.error.append(err.GAME_IMPROPER_SUPPORT_ORDER % (unit, order)) + return None + + # Make sure the support destination can be reached... + if order_type == 'S': + if not self._abuts(unit_type, unit_loc, order_type, dest): + if report: + self.error.append(err.GAME_UNIT_CANT_PROVIDE_SUPPORT_TO_DEST % (unit, order)) + return None + + # ... or that the fleet can perform the described convoy + elif not self._has_convoy_path(rcvr[0], rcvr[2:5], dest, convoying_loc=unit_loc): + if report: + self.error.append(err.GAME_IMPOSSIBLE_CONVOY_ORDER % (unit, order)) + return None + + # ------------------------------------------------------------- + # MOVE order + elif order_type == '-': + # Expected format '- xxx' or '- xxx - yyy - zzz' + if (len(word) & 1 and word[-1] != 'VIA') or (len(word[:-1]) & 1 and word[-1] == 'VIA'): + if report: + self.error.append(err.GAME_BAD_MOVE_ORDER % (unit, order)) + return None + + # Only a convoying army can give a path + if len(word) > 2 and unit_type != 'A' and self.map.area_type(unit_loc) not in ('COAST', 'PORT'): + if report: + self.error.append(err.GAME_UNIT_CANT_CONVOY % (unit, order)) + return None + + # Step through every "- xxx" in the order and ensure the unit can get there at every step + src = unit_loc + order_type = 'C-'[len(word) == 2 or (len(word) == 3 and word[-1] == 'VIA')] + visit = [] + + # Checking that unit is not returning back where it started ... + if word[-1] == unit_loc and order_type < 'C': + if report: + self.error.append(err.GAME_MOVING_UNIT_CANT_RETURN % (unit, order)) + return None + + # For a multi-step convoy + if order_type == 'C': + + # Checking that destination is a COAST or PORT ... + if self.map.area_type(word[-1]) not in ('COAST', 'PORT'): + if report: + self.error.append(err.GAME_CONVOYING_UNIT_MUST_REACH_COST % (unit, order)) + return None + + # Making sure that army is not having a specific coast as destination ... + if unit_type == 'A' and '/' in word[-1]: + if report: + self.error.append(err.GAME_ARMY_CANT_CONVOY_TO_COAST % (unit, order)) + return None + + # Making sure that the syntax is '- xxx - yyy - zzz' + offset = 1 if word[-1] == 'VIA' else 0 + if [1 for x in range(0, len(word) - offset, 2) if word[x] != '-']: + if report: + self.error.append(err.GAME_BAD_MOVE_ORDER % (unit, order)) + return None + + # For every location touched + ride = word[1:len(word) - offset:2] + for num, to_loc in enumerate(ride): + + # Checking that ride is not visited twice ... + if to_loc in visit and 'CONVOY_BACK' not in rules: + if report: + self.error.append(err.GAME_CONVOY_UNIT_USED_TWICE % (unit, order)) + return None + visit += [to_loc] + + # Making sure the last 2 locations touch, and that A/F can convoy through them + # pylint: disable=too-many-boolean-expressions + if (not self._abuts(unit_type, src, order_type, to_loc) + and not self._has_convoy_path(unit_type, unit_loc, to_loc) + and (len(word) == 2 + or unit_type == 'A' and (not self._abuts('F', to_loc, 'S', src)) + or (unit_type == 'F' + and (to_loc[:3].upper() not in + [abut[:3].upper() for abut in self.map.abut_list(src[:3])])))): + if report: + self.error.append(err.GAME_UNIT_CANT_MOVE_INTO_DEST % (unit, order)) + return None + + # If VIA flag set, make sure there is at least a possible path + if word[-1] == 'VIA' and not self._has_convoy_path(unit_type, unit_loc, to_loc): + if report: + self.error.append(err.GAME_UNIT_CANT_MOVE_VIA_CONVOY_INTO_DEST % (unit, order)) + return None + + # If we are at an intermediary location + if num < len(ride) - 1: + + # Trying to portage convoy fleet through water + # or trying to convoy army through LAND or COAST + if (unit_type == 'F' + and ((unit_type == 'A' and self.map.area_type(to_loc) not in ('WATER', 'PORT')) + or unit_type + self.map.area_type(to_loc) == 'FWATER')): + if report: + self.error.append(err.GAME_BAD_CONVOY_MOVE_ORDER % (unit, order)) + return None + + # Making sure there is a unit there to convoy ... + if not self._unit_owner('AF'[unit_type == 'A'] + ' ' + to_loc): + if report: + self.error.append(err.GAME_CONVOY_THROUGH_NON_EXISTENT_UNIT % (unit, order)) + return None + + # Portaging fleets must finish the turn on a coastal location listed in upper-case + elif (num + and unit_type == 'F' + and (to_loc not in self.map.loc_abut or self.map.area_type(to_loc) not in ('COAST', 'PORT'))): + if report: + self.error.append(err.GAME_IMPOSSIBLE_CONVOY % (unit, order)) + return None + src = to_loc + + # ------------------------------------------------------------- + # HOLD order + elif order_type == 'H': + if len(word) != 1: + if report: + self.error.append(err.GAME_INVALID_HOLD_ORDER % (unit, order)) + return None + + else: + if report: + self.error.append(err.GAME_UNRECOGNIZED_ORDER_TYPE % (unit, order)) + return None + + # All done + return status + + def _expand_order(self, word): + """ Detects errors in order, convert to short version, and expand the default coast if necessary + :param word: The words (e.g. order.split()) for an order + (e.g. ['England:', 'Army', 'Rumania', 'SUPPORT', 'German', 'Army', 'Bulgaria']). + :return: The compacted and expanded order (e.g. ['A', 'RUM', 'S', 'A', 'BUL']) + """ + if not word: + return word + + result = self.map.compact(' '.join(word)) + result = self.map.vet(self.map.rearrange(result), 1) + + # Removing errors (Negative values) + final, order = [], '' + for result_ix, (token, token_type) in enumerate(result): + if token_type < 1: + if token_type == -1 * POWER: + self.error.append(err.GAME_UNKNOWN_POWER % token) + continue + elif token_type == -1 * UNIT: + self.error.append(err.GAME_UNKNOWN_UNIT_TYPE % token) + continue + elif token_type == -1 * LOCATION: + self.error.append(err.GAME_UNKNOWN_LOCATION % token) + elif token_type == -1 * COAST: + token_without_coast = token.split('/')[0] + if token_without_coast in self.map.aliases.values(): + self.error.append(err.GAME_UNKNOWN_COAST % token) + result[result_ix] = token_without_coast, -1 * LOCATION + else: + self.error.append(err.GAME_UNKNOWN_LOCATION % token) + elif token_type == -1 * ORDER: + self.error.append(err.GAME_UNKNOWN_ORDER_TYPE % token) + continue + else: + self.error.append(err.GAME_UNRECOGNIZED_ORDER_DATA % token) + continue + token_type = -1 * token_type + + # Remove power names. Checking ownership of the unit might be better + if token_type == POWER: + continue + + # Remove the "H" from any order having the form "u xxx S xxx H" + # Otherwise storing order + elif token_type == ORDER: + if order == 'S' and token == 'H': + continue + order += token + + # Treat each move order the same. Eventually we'd want to distinguish between them + elif token_type == MOVE_SEP: + result[result_ix] = '-', token_type + order += '-' + + elif token_type == OTHER: + order = '' + + # Spot ambiguous place names and coasts in support and convoy orders + if 'NO_CHECK' in self.rules: + if token_type == LOCATION and token in self.map.unclear: + self.error.append(err.GAME_AMBIGUOUS_PLACE_NAME % token) + if token_type == COAST and token.split('/')[0] in self.map.unclear: + self.error.append(err.GAME_AMBIGUOUS_PLACE_NAME % token) + + final += [token] + + # Default any fleet move's coastal destination, then we're done + return self.map.default_coast(final) + + def _expand_coast(self, word): + """ Makes sure the correct coast is specified (if any) is specified. + For Fleets: Adjust to correct coast if wrong coast is specified + For Armies: Removes coast if coast is specified + (e.g. if F is on SPA/SC but the order is F SPA/NC - LYO, the coast will be added or corrected) + :param word: A list of tokens (e.g. ['F', 'GRE', '-', 'BUL']) + :return: The updated list of tokens (e.g. ['F', 'GRE', '-', 'BUL/SC']) + """ + if not word: + return word + + unit_type = word[0] + loc = word[1] + loc_without_coast = loc[:loc.find('/')] if '/' in loc else loc + + # For armies: Removing coast if specified + if unit_type == 'A': + if '/' in loc: + word[1] = loc_without_coast + if len(word) == 4 and '/' in word[3]: + word[3] = word[3][:word[3].find('/')] + + # For fleets: If there is a unit in the country, but not on the specified coast, we need to correct the coast + elif self._unit_owner('%s %s' % (unit_type, loc), coast_required=1) is None \ + and self._unit_owner('%s %s' % (unit_type, loc_without_coast), coast_required=0) is not None: + + # Finding the correct coast + for loc in [l for l in self.map.locs if l[:3] == loc_without_coast]: + if self._unit_owner('%s %s' % (word[0], loc), coast_required=1) is not None: + word[1] = loc + break + + # Removing cost if unit is supporting an army moving to coast + # F WES S A MAR - SPA/SC -> F WES S A MAR - SPA + if len(word) == 7 and '/' in word[-1] and word[2] == 'S' and word[3] == 'A': + dest = word[-1] + word[-1] = dest[:dest.find('/')] + + # Adjusting the coast if a fleet is supporting a move to the wrong coast + # F WES S F GAS - SPA/SC -> F WES S F GAS - SPA/NC + if len(word) == 7 and word[0] == 'F' and word[2] == 'S' and word[3] == 'F' and '/' in word[-1]: + word = word[:3] + self.map.default_coast(word[3:6] + [word[6][:3]]) + + # Returning with coasts fixed + return word + + def _add_unit_types(self, item): + """ Adds any missing "A" and "F" designations and (current) coastal locations for fleets. + :param item: The words for expand_order() (e.g. ['A', 'RUM', 'S', 'BUL']) + :return: The list of items with A/F and coasts added (e.g. ['A', 'RUM', 'S', 'A', 'BUL']) + """ + # dependent is set when A/F is expected afterwards (so at start and after C/S) + # had_type indicates that A/F was found + word, dependent, had_type = [], 1, 0 + for token in item: + if not dependent: + dependent = token in 'CS' + elif token in 'AF': + had_type = 1 + elif token in ('RETREAT', 'DISBAND', 'BUILD', 'REMOVE'): + pass + else: + try: + # We have a location + # Try to find an active or retreating unit at current location + unit = [unit for power in self.powers.values() + for unit in (power.units, power.retreats.keys())[self.phase_type == 'R'] + if unit[2:].startswith(token)][0] + + # If A/F is missing, add it + if not had_type: + word += [unit[0]] + + # Trying to detect if coast is specified in retrieved unit location + # If yes, update the token, so it incorporates coastal information + if self.map.is_valid_unit(word[-1] + unit[1:]): + token = unit[2:] + except IndexError: + pass + dependent = had_type = 0 + + # Add token to final list + word += [token] + return word + + def _add_coasts(self): + """ This method adds the matching coast to orders supporting or (portage) convoying a fleet to + a multi-coast province. + :return: Nothing + """ + # converting to unique format + orders = {} + for unit, order in self.orders.items(): + orders[unit] = order + + # Add coasts to support and (portage) convoy orders for fleets moving to a specific coast + for unit, order in orders.items(): + # Only rewriting 'S F XXX - YYY' and 'C F XXX - YYY' + if order[:3] not in ('S F', 'C F'): + continue + word = order.split() + + # rcvr is the unit receiving the support or convoy (e.g. F XXX in S F XXX - BER) + # Making sure rcvr has also submitted orders (e.g. F XXX - YYY) + rcvr = ' '.join(word[1:3]) + try: + rcvr = [x for x in orders if x.startswith(rcvr)][0] + except IndexError: + # No orders found + continue + + # Updating order to include rcvr full starting position (with coasts) + orders[unit] = ' '.join([order[0], rcvr] + word[3:]).strip() + + # Checking if coast is specified in destination position + if '-' in order: + # his -> '- dest/coast' + # updating order if coast is specified in his dest, but not ours + his = ' '.join(orders.get(rcvr, '').split()[-2:]) + if his[0] == '-' and his.split('/')[0] == ' '.join(word[3:]): + orders[unit] = order[:2] + rcvr + ' ' + his + + # Updating game.orders object + self.orders[unit] = orders[unit] + + def _validate_status(self, reinit_powers=True): + """ Validates the status of the game object""" + # Loading map and setting victory condition + if not self.map: + self.load_map(reinit_powers=reinit_powers) + self.victory = self.map.victory + + # By default, 50% +1 of the scs + # Or for victory homes, half the average number of home centers belonging to other powers plus one + if not self.victory: + self.victory = [len(self.map.scs) // 2 + 1] + + # Ensure game phase was set + if not self.phase: + self.phase = self.map.phase + apart = self.phase.split() + if len(apart) == 3: + if '%s %s' % (apart[0], apart[2]) not in self.map.seq: + self.error += [err.GAME_BAD_PHASE_NOT_IN_FLOW] + self.phase_type = apart[2][0] + else: + self.phase_type = '-' + + # Validate the BEGIN phase (if one was given) + if self.phase == 'FORMING': + apart = self.map.phase.split() + try: + int(apart[1]) + del apart[1] + if ' '.join(apart) not in self.map.seq: + raise Exception() + except ValueError: + self.error += [err.GAME_BAD_BEGIN_PHASE] + + # Set victory condition + if self.phase not in ('FORMING', 'COMPLETED'): + try: + year = abs(int(self.phase.split()[1]) - self.map.first_year) + win = self.victory[:] + self.win = win[min(year, len(win) - 1)] + except ValueError: + self.error += [err.GAME_BAD_YEAR_GAME_PHASE] + + # Initialize power data + for power in self.powers.values(): + + # Initialize homes if needed + if power.homes is None: + power.homes = [] + for home in self.map.homes.get(power.name, []): + self.update_hash(power.name, loc=home, is_home=True) + power.homes.append(home) + + # ==================================================================== + # Private Interface - Generic methods + # ==================================================================== + def _load_rules(self): + """ Loads the list of rules and their forced (+) and denied (!) corresponding rules + :return: A tuple of dictionaries: rules, forced, and denied + rules = {'NO_CHECK': + { 'group': '3 Movement Order', + 'variant': 'standard', + '!': ['RULE_1', 'RULE_2'], + '+': ['RULE_3'] } } + forced = {'payola': 'RULE_4'} + denied = {'payola': 'RULE_5'} + """ + if self.__class__.rule_cache: + return self.__class__.rule_cache + group = variant = '' + data, forced, denied = {}, {}, {} + file_path = os.path.join(settings.PACKAGE_DIR, 'README_RULES.txt') + + if not os.path.exists(file_path): + self.error.append(err.GAME_UNABLE_TO_FIND_RULES) + return data, forced, denied + + with open(file_path, 'r', encoding='utf-8') as file: + for line in file: + word = line.strip().split() + + # Rules are in the format <!-- RULE NAME !RULE_1 +RULE_2 --> + # Where ! indicates a denied rule, and + indicates a forced rule + if word[:2] == ['<!--', 'RULE'] and word[-1][-1] == '>': + + # <!-- RULE GROUP 6 Secrecy --> + # group would be '6 Secrecy' + if word[2] == 'GROUP': + group = ' '.join(word[3:-1]) + + # <!-- RULE VARIANT standard --> + elif word[2] == 'VARIANT': + variant = word[3] + forced[variant] = [x[1:] for x in word[4:-1] if x[0] == '+'] + denied[variant] = [x[1:] for x in word[4:-1] if x[0] == '!'] + + # <!-- RULE NAME !RULE_1 +RULE_2 --> + elif word[2] != 'END': + rule = word[2] + if rule not in data: + data[rule] = {'group': group, 'variant': variant} + for control in word[3:-1]: + if control[0] in '-=+!': + data[rule].setdefault(control[0], []).append(control[1:]) + + self.__class__.rule_cache = (data, forced, denied) + return data, forced, denied + + def _build_hash_table(self): + """ Builds the Zobrist hash tables """ + if not self.map or self.map_name in self.__class__.zobrist_tables: + return + + # Finding powers and locations + map_powers = sorted([power_name for power_name in self.map.powers]) + map_locs = sorted([loc.upper() for loc in self.map.locs if self.map.area_type(loc) != 'SHUT']) + nb_powers = len(map_powers) + nb_locs = len(map_locs) + + # Generating a standardized seed + np_state = np.random.get_state() + np.random.seed(12345 + sum([ord(x) * 7 ** ix for ix, x in enumerate(self.map_name)]) % 2 ** 32) + self.__class__.zobrist_tables[self.map_name] = { + 'unit_type': np.random.randint(1, sys.maxsize, [2, nb_locs]), + 'units': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]), + 'dis_unit_type': np.random.randint(1, sys.maxsize, [2, nb_locs]), + 'dis_units': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]), + 'centers': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]), + 'homes': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]), + 'map_powers': map_powers, + 'map_locs': map_locs + } + np.random.set_state(np_state) + + # ==================================================================== + # Private Interface - PROCESSING and phase change methods + # ==================================================================== + def _begin(self): + """ Called to begin the game and move to the start phase + :return: Nothing + """ + self._move_to_start_phase() + self.note = '' + self.win = self.victory[0] + # Create dummy power objects for non-loaded powers. + for power_name in self.map.powers: + if power_name not in self.powers: + self.powers[power_name] = Power(self, power_name, role=self.role) + # Initialize all powers. + for starter in self.powers.values(): + # Starter having type won't be initialized. + starter.initialize(self) + + def _process(self): + """ Processes the current phase of the game """ + # Convert all raw movement phase "ORDER"s in a NO_CHECK game to standard orders before calling + # Game.process(). All "INVALID" and "REORDER" orders are left raw -- the Game.move_results() method + # knows how to detect and report them + if 'NO_CHECK' in self.rules and self.phase_type == 'M': + for power in self.powers.values(): + orders, power.orders, civil_disorder = power.orders, {}, power.civil_disorder + for status, order in orders.items(): + if status[:5] != 'ORDER': + power.orders[status] = order + elif order: + self._add_order(power, order.split()) + power.civil_disorder = civil_disorder + + # Processing the game + if self.phase_type == 'M': + self._determine_orders() + self._add_coasts() + + # Resolving orders + self._resolve() + + def _advance_phase(self): + """ Advance the game to the next phase (skipping phases with no actions) + :return: A list of lines to put in the results + """ + + # Save results for current phase. + # NB: result_history is updated here, neither in process() nor in draw(), + # unlike order_history, message_history and state_history. + self.result_history.put(self._phase_wrapper_type(self.current_short_phase), self.result) + self.result = {} + + # For each possible phase + for _ in self.map.seq: + + # If game is not yet started, or completed can't advance + if self.phase in (None, 'FORMING', 'COMPLETED'): + break + + # Finding next phase and setting variables + self.phase = self._find_next_phase() + self.phase_type = self.phase.split()[-1][0] + + # Check phase determines if we need to process phase (0) or can skip it (1) + if not self._check_phase(): + break + else: + raise Exception("FailedToAdvancePhase") + + # Rebuilding the convoy cache + self._build_list_possible_convoys() + + # Returning + return [] + + def _move_to_start_phase(self): + """ Moves to the map's start phase + :return: Nothing, but sets the self.phase and self.phase_type settings + """ + # Retrieve the beginning phase and phase type from the map + self.phase = self.map.phase + self.phase_type = self.phase.split()[-1][0] + + def _find_next_phase(self, phase_type=None, skip=0): + """ Returns the long name of the phase coming immediately after the current phase + :param phase_type: The type of phase we are looking for + (e.g. 'M' for Movement, 'R' for Retreats, 'A' for Adjust.) + :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after) + :return: The long name of the next phase (e.g. FALL 1905 MOVEMENT) + """ + return self.map.find_next_phase(self.phase, phase_type, skip) + + def _find_previous_phase(self, phase_type=None, skip=0): + """ Returns the long name of the phase coming immediately before the current phase + :param phase_type: The type of phase we are looking for + (e.g. 'M' for Movement, 'R' for Retreats, 'A' for Adjust.) + :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after) + :return: The long name of the previous phase (e.g. SPRING 1905 MOVEMENT) + """ + return self.map.find_previous_phase(self.phase, phase_type, skip) + + def _get_start_phase(self): + """ Returns the name of the start phase""" + cur_phase, cur_phase_type = self.phase, self.phase_type + self._move_to_start_phase() + phase = self.phase + self.phase, self.phase_type = cur_phase, cur_phase_type + return phase + + def _check_phase(self): + """ Checks if we need to process a phase, or if we can skip it if there are no actions + :return: Boolean (0 or 1) - 0 if we need to process phase, 1 if we can skip it + """ + # pylint: disable=too-many-return-statements + # Beginning / End of game - Can't skip + if self.phase in (None, 'FORMING', 'COMPLETED'): + return 0 + + # When changing phases, clearing all caches + self.clear_cache() + + # Movement phase - Always need to process + if self.phase_type == 'M': + return 0 + + # Retreats phase + if self.phase_type == 'R': + # We need to process if there are retreats + if [1 for x in self.powers.values() if x.retreats]: + return 0 + + # Otherwise, clearing flags and skipping phase + for power in self.powers.values(): + for dis_unit in power.retreats: + self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True) + power.retreats, power.adjust, power.civil_disorder = {}, [], 0 + self.result = {} + if 'DONT_SKIP_PHASES' in self.rules: + return 0 + return 1 + + # Adjustments phase + if self.phase_type == 'A': + # Capturing supply centers + self._capture_centers() + + # If completed, can't skip + if self.phase == 'COMPLETED': + return 0 + + # If we have units to remove or to build, we need to process + for power in self.powers.values(): + units, centers = len(power.units), len(power.centers) + if [x for x in power.centers if x in power.homes]: + centers += (0 + min(0, len([0 for x in power.units if x[2:5] in power.homes]))) + if units > centers or (units < centers and self._build_limit(power)): + return 0 + + # Otherwise, skipping + self.result = {} + if 'DONT_SKIP_PHASES' in self.rules: + return 0 + return 1 + + # Other phases. We need to process manually. + return 0 + + def _post_move_update(self): + """ Deletes orders and removes CD flag after moves """ + for power in self.powers.values(): + power.orders, power.civil_disorder = {}, 0 + + def _build_sites(self, power): + """ Returns a list of sites where power can build units + :param power: The power instance to check + :return: A list of build sites + """ + + # Retrieving the list of homes (build sites) for the power, and the list of active powers + homes = power.homes + + # Can build on any of his centers + # -- BUILD_ANY: Powers may build new units at any owned supply center, not simply at their home supply centers. + if 'BUILD_ANY' in self.rules: + homes = power.centers + + # Updating homes to only include homes if they are unoccupied, + homes = [h for h in homes if h in power.centers and h not in + [u[2:5] for p in self.powers.values() for u in p.units]] + return homes + + def _build_limit(self, power, sites=None): + """ Determines the maximum number of builds a power can do in an adjustment phase + Note: This function assumes that one unit can be built per build sites + alternative sites + The actual maximum build limit would be less if units are built on alternative sites. + :param power: The power instance to check + :param sites: The power's build sites (or None to compute them) + :return: An integer representing the maximum number of simultaneous builds + """ + # Computing build_sites if not provided + if sites is None: + sites = self._build_sites(power) + + # Returning number of sites + return len(sites) + + def _calculate_victory_score(self): + """ Calculates the score to determine win for each power + :return: A dict containing the score for each power (e.g. {'FRANCE': 10, 'ENGLAND': 2}) + """ + score = {} + + # Score is the number of supply centers owned + for power in self.powers.values(): + score[power] = len([sc for sc in power.centers]) + return score + + def _determine_win(self, last_year): + """ Determine if we have a win. + :param last_year: A dict containing the score for each power (e.g. {'FRANCE': 10, 'ENGLAND': 2}) + (from the previous year) + :return: Nothing + """ + victors, this_year = [], self._calculate_victory_score() + year_centers = [this_year[x] for x in self.powers.values()] + + # Determining win + for power in self.powers.values(): + centers = this_year[power] + + # 1) you must have enough centers to win + if (centers >= self.win + + # 2) and you must grow or, if "HOLD_WIN", must have had a win + and (centers > last_year[power], last_year[power] >= self.win)['HOLD_WIN' in self.rules] + + # 3) and you must be alone in the lead (not required in case of SHARED_VICTORY) + and ('SHARED_VICTORY' in self.rules + or (centers, year_centers.count(centers)) == (max(year_centers), 1))): + victors += [power] + + # We have a winner! + if victors: + self._finish([victor.name for victor in victors]) + + # DRAW if 100 years + elif int(self.phase.split()[1]) - self.map.first_year + 1 == 100: + self.draw() + + def _capture_centers(self): + """ In Adjustment Phase, proceed with the capture of occupied supply centers + :return: Nothing + """ + victory_score_prev_year = self._calculate_victory_score() + + # If no power owns centers, initialize them + if not [1 for x in self.powers.values() if x.centers]: + for power in self.powers.values(): + for center in power.centers: + self.update_hash(power.name, loc=center, is_center=True) + power.centers = [] + for center in power.homes: + self.update_hash(power.name, loc=center, is_center=True) + power.centers.append(center) + + # Remember the current center count for the various powers, for use in victory condition check, + # then go through and see if any centers have been taken over + unowned = self.map.scs[:] + for power in self.powers.values(): + for center in power.centers: + if center in unowned: + unowned.remove(center) + + # Keep track of scs lost + self.lost = {} + for power in list(self.powers.values()) + [None]: + # Centers before takover + if power: + centers = power.centers + else: + centers = unowned + + # For each center, check if we took ownership + for center in centers[:]: + for owner in self.powers.values(): + + # 1) If center is unowned, or 2) owned by someone else and that we have a unit on it + # Proceed with transfer, and record lost + if (not power or owner is not power) and center in [x[2:5] for x in owner.units]: + self._transfer_center(power, owner, center) + if not power: + unowned.remove(center) + else: + self.lost[center] = power + break + + # Determining if we have a winner + self._determine_win(victory_score_prev_year) + + def _transfer_center(self, from_power, to_power, center): + """ Transfers a supply center from a power to another + :param from_power: The power instance from whom the supply center is transfered + :param to_power: The power instance to whom the supply center is transferred + :param center: The supply center location (e.g. 'PAR') + :return: Nothing + """ + if from_power: + self.update_hash(from_power.name, loc=center, is_center=True) + from_power.centers.remove(center) + if center not in to_power.centers: + self.update_hash(to_power.name, loc=center, is_center=True) + to_power.centers += [center] + + def _finish(self, victors): + """ Indicates that a game is finished and has been won by 'victors' + :param victors: The list of victors (e.g. ['FRANCE', 'GERMANY']) + :return: Nothing + """ + # Setting outcome, and end date. Clearing orders and saving. + self.outcome = [self._phase_abbr()] + victors + self.note = 'Victory by: ' + ', '.join([vic[:3] for vic in victors]) + self.phase = 'COMPLETED' + self.set_status(strings.COMPLETED) + for power in self.powers.values(): + for dis_unit in power.retreats: + self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True) + power.retreats, power.adjust, power.civil_disorder = {}, [], 0 + + def _phase_abbr(self, phase=None): + """ Constructs a 5 character representation (S1901M) from a phase (SPRING 1901 MOVEMENT) + :param phase: The full phase (e.g. SPRING 1901 MOVEMENT) + :return: A 5 character representation of the phase + """ + return self.map.phase_abbr(phase or self.phase) + + # ==================================================================== + # Private Interface - ORDER Submission methods + # ==================================================================== + def _add_order(self, power, word, expand=True, replace=True): + """ Adds an order for a power + :param power: The power instance issuing the order + :param word: The order (e.g. ['A', 'PAR', '-', 'MAR']) + :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.) + If false, expect orders in the following format. False gives a performance improvement. + :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders. + :return: Nothing, but adds error to self.error + + Expected format: + A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI + A IRO R MAO, A IRO D, A LON B, F LIV B + """ + if not word: + return None + + raw_word = word + + if expand: + # Check that the order is valid. If not, self.error will say why. + word = self._expand_order(word) + word = self._expand_coast(word) + word = self._add_unit_types(word) + word = self.map.default_coast(word) + + # Last word is '?' - Removing it + if word and len(word[-1]) == 1 and not word[-1].isalpha(): + word = word[:-1] + if len(word) < 2: + return self.error.append(err.STD_GAME_BAD_ORDER % ' '.join(word)) + + # Checking if we can order unit + unit, order = ' '.join(word[:2]), ' '.join(word[2:]) + owner = self._unit_owner(unit) + if not owner or owner is not power: + self.error += [err.STD_GAME_UNORDERABLE_UNIT % ' '.join(word)] + + # Validating order + elif order: + valid = self._valid_order(power, unit, order) + + # Valid order. But is it to a unit already ordered? This is okay in a NO_CHECK game, and + # we HOLD the unit. If not, pack it back into the power's order list. + if valid is not None: + power.civil_disorder = 0 + if valid == -1: + order += ' ?' + if unit not in power.orders or (replace and 'NO_CHECK' not in self.rules): + power.orders[unit] = order + elif 'NO_CHECK' in self.rules: + count = len(power.orders) + if power.orders[unit] not in ('H', order): + power.orders['REORDER %d' % count] = power.orders[unit] + count += 1 + power.orders[unit] = 'H' + power.orders['REORDER %d' % count] = ' '.join(word) + else: + self.error += [err.STD_GAME_UNIT_REORDERED % unit] + + # Invalid order in NO_CHECK game + elif 'NO_CHECK' in self.rules: + count = len(power.orders) + power.orders['INVALID %d' % count] = ' '.join(raw_word) + + # Returning nothing + return None + + def _update_orders(self, power, orders, expand=True, replace=True): + """ Updates the orders of a power + :param power: The power instance (or None if updating multiple instances) + :param orders: The updated list of orders + e.g. ['A MAR - PAR', 'A PAR - BER', ...] + :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.) + If false, expect orders in the following format. False gives a performance improvement. + :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders. + :return: Nothing + + Expected format: + A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI + A IRO R MAO, A IRO D, A LON B, F LIV B + """ + cur_power, had_orders, has_orders, powers = power, [], [], [] + + # For each order + for line in orders: + + word = line.strip().split() + who = cur_power + + if not word: + continue + + # Checking if the power can order + if not hasattr(who, 'orders'): + return self.error.append('%s HAS NO UNITS OF ITS OWN TO ORDER' % who.name) + + # NMR = No Moves Received (NMR or CLEAR command) + nmr = (len(word) == 1 + and word[0][word[0][:1] in '([':len(word[0]) - (word[0][-1:] in '])')].upper() in ('NMR', 'CLEAR')) + if who not in powers: + + # Empty orders before sticking any new orders in it. + had_orders += [who.orders] + powers += [who] + if nmr: + continue + + # If CLEAR or NMR, clear orders + elif nmr: + who.orders = {} + has_orders = [x for x in has_orders if x is not who] + continue + + # Adds orders + if 'NO_CHECK' in self.rules: + data = self._expand_order(word) + if len(data) < 3 and (len(data) == 1 or data[1] != 'H'): + self.error.append(err.STD_GAME_BAD_ORDER % line.upper()) + continue + + # Voiding previous order on same unit + if replace: + for order in who.orders: + order_parts = who.orders[order].split() + if len(order_parts) >= 2 and order_parts[1][:3] == word[1][:3]: + who.orders[order] = '' + + # Adding new order + who.orders['ORDER %d' % (len(who.orders) + 1)] = ' '.join(word) + else: + self._add_order(who, word, expand=expand, replace=replace) + if who.orders and who not in has_orders: + has_orders += [who] + + # Make sure the player can update his orders + if not powers: + return 1 + if self.error: + return self.error + + # Clear CD flag, even if orders were cleared + for who in powers: + who.civil_disorder = 0 + + # Returning nothing + return None + + def _add_retreat_orders(self, power, orders, expand=True, replace=True): + """ Adds a retreat order (Retreats Phase) + :param power: The power instance who is submitting orders (or None if power is in the orders) + :param orders: The list of adjustment orders + (format can be [Country: order], [Country, order, order], or [order,order]) + :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.) + If false, expect orders in the following format. False gives a performance improvement. + :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders. + :return: Nothing, but adds error to self.error + + Expected format: + A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI + A IRO R MAO, A IRO D, A LON B, F LIV B + """ + # No orders, returning + if not orders: + power.adjust, power.civil_disorder = [], 0 + return + + # Processing each order + adjust, retreated = [], [] + for order in orders: + word = order.split() + if not word or len(word) < 2: + continue + + # Expanding and adding unit types + if expand: + word = self._expand_order([order]) + word = self._add_unit_types(word) + + # Add 'R' has order type for Retreat, 'D' for Disband + if word[0] == 'R' and len(word) > 3: + del word[0] + if word[0] in 'RD': + word = word[1:] + word[:1] + + # Checking if unit can retreat + unit = ' '.join(word[:2]) + try: + unit = [r_unit for r_unit in power.retreats if r_unit == unit or r_unit.startswith(unit + '/')][0] + except IndexError: + adjust += ['VOID ' + order] + self.error.append(err.GAME_UNIT_NOT_IN_RETREAT % unit) + continue + + # Checking if unit already retreated + if unit in retreated: + adjust += ['VOID ' + order] + self.error.append(err.GAME_TWO_ORDERS_FOR_RETREATING_UNIT % unit) + continue + word[1] = unit[2:] + + # Adding Disband for retreats with no destination + if len(word) == 3 and word[2] in 'RD': + word[2] = 'D' + + # Checking if retreat destination is valid + elif len(word) == 4 and word[2] in 'R-': + word[2] = 'R' + if word[3] not in power.retreats[unit]\ + or self._unit_owner('A {}'.format(word[3][:3]), coast_required=0) \ + or self._unit_owner('F {}'.format(word[3][:3]), coast_required=0): + self.error.append(err.GAME_INVALID_RETREAT_DEST % ' '.join(word)) + adjust += ['VOID ' + order] + continue + + # Invalid retreat order - Voiding + else: + self.error.append(err.GAME_BAD_RETREAT_ORDER % ' '.join(word)) + adjust += ['VOID ' + order] + continue + + # Adding retreat order and marking unit as retreated + retreated += [unit] + adjust += [' '.join(word)] + + # Replacing previous orders + if replace: + for order in adjust: + word = order.split() + if len(word) >= 2 and word[0] != 'VOID': + power.adjust = [adj_order for adj_order in power.adjust if adj_order.split()[1:2] != word[1:2]] + + # Otherwise, marking re-orders as invalid + else: + ordered_locs = [adj_order.split()[1] for adj_order in power.adjust] + for order in adjust[:]: + word = order.split() + if len(word) >= 2 and word[1] in ordered_locs: + self.error += [err.GAME_MULTIPLE_ORDERS_FOR_UNIT % ' '.join(word[:2])] + adjust.remove(order) + + # Finalizing orders + power.adjust += adjust + power.civil_disorder = 0 + + def _update_retreat_orders(self, power, orders, expand=True, replace=True): + """ Updates order for Retreats phase + :param power: The power instance submitting the orders + :param orders: The updated orders + :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.) + If false, expect orders in the following format. False gives a performance improvement. + :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders. + :return: List of processing errors + + Expected format: + A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI + A IRO R MAO, A IRO D, A LON B, F LIV B + """ + for who, adj in self._distribute_orders(power, orders): + self._add_retreat_orders(who, adj, expand=expand, replace=replace) + return self.error + + def _add_adjust_orders(self, power, orders, expand=True, replace=True): + """ Adds an adjustment order (Adjustment Phase) + :param power: The power instance who is submitting orders (or None if power is in the orders) + :param orders: The list of adjustment orders (format can be [Country: order], + [Country, order, order], or [order,order]) + :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.) + If false, expect orders in the following format. False gives a performance improvement. + :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders. + :return: Nothing, but adds error to self.error + + Expected format: + A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI + A IRO R MAO, A IRO D, A LON B, F LIV B + """ + # pylint: disable=too-many-branches + # No orders submitted, returning + if not orders: + power.adjust, power.civil_disorder = [], 0 + return + + # Calculating if the power can build or remove units + adjust, places = [], [] + need, sites = len(power.centers) - len(power.units), [] + order_type = 'D' if need < 0 else 'B' + + # If we can build, calculating list of possible build locations + if need > 0: + sites = self._build_sites(power) + need = min(need, self._build_limit(power, sites)) + + # Processing each order + for order in orders: + order = order.strip() + + if order == 'WAIVE': + # Check WAIVE order immediately and continue to next loop step. + if need >= 0: + adjust += [order] + else: + adjust += ['VOID ' + order] + self.error += ['WAIVE NOT ALLOWED FOR DISBAND'] + continue + + if not order or len(order.split()) < 2: + continue + word = self._expand_order([order]) if expand else order.split() + + # Checking if unit can Build/Disband, otherwise voiding order + if word[-1] == order_type: + pass + elif word[-1] in 'BD': + adjust += ['VOID ' + order] + self.error += ['ORDER NOT ALLOWED: ' + order] + continue + + # Adding unit type + if word[-1] == 'D' and expand: + word = self._add_unit_types(word) + + # Checking for 'Disband' + order = ' '.join(word) + if word[-1] == 'D': + if len(word) == 3: + unit = ' '.join(word[:2]) + + # Invalid unit, voiding order + if unit not in power.units: + adjust += ['VOID ' + order] + self.error += [err.GAME_NO_SUCH_UNIT % unit] + + # Order to remove unit + elif order not in adjust: + adjust += [order] + + # Invalid order, voiding + else: + adjust += ['VOID ' + order] + self.error += [err.GAME_MULTIPLE_ORDERS_FOR_UNIT % unit] + else: + adjust += ['VOID ' + order] + self.error += [err.GAME_BAD_ADJUSTMENT_ORDER % order] + + # Checking for BUILD + elif len(word) == 3: + site = word[1][:3] + + # Invalid build site + if site not in sites: + adjust += ['VOID ' + order] + self.error += [err.GAME_INVALID_BUILD_SITE % order] + + # Site already used + elif site in places: + adjust += ['VOID ' + order] + self.error += [err.GAME_MULT_BUILDS_IN_SITE % order] + + # Unit can't be built there + elif not self.map.is_valid_unit(' '.join(word[:2])): + adjust += ['VOID ' + order] + self.error += [err.GAME_INVALID_BUILD_ORDER % order] + + # Valid build sites + else: + adjust += [order] + places += [site] + + # Otherwise, unknown order - Voiding + else: + adjust += ['VOID ' + order] + self.error += [err.GAME_BAD_ADJUSTMENT_ORDER % order] + + # NB: We skip WAIVE orders when checking for replacements. + # We will check them later. + + # Replacing previous orders + if replace: + for order in adjust: + word = order.split() + if len(word) >= 2 and word[0] != 'VOID': + power.adjust = [adj_order for adj_order in power.adjust + if adj_order == 'WAIVE' or adj_order.split()[1] != word[1]] + + # Otherwise, marking re-orders as invalid + else: + ordered_locs = [adj_order.split()[1] for adj_order in power.adjust if adj_order != 'WAIVE'] + for order in adjust[:]: + word = order.split() + if len(word) >= 2 and word[1] in ordered_locs: + self.error += [err.GAME_MULTIPLE_ORDERS_FOR_UNIT % ' '.join(word[:2])] + adjust.remove(order) + + # Finalizing orders + power.adjust += adjust + power.civil_disorder = 0 + + # We check WAIVE orders in power.adjust after updating power.adjust, + # as WAIVE orders depend on variable `need`, whom computation is relative to power + # (ie. not relative to orders being currently adjusted). + + # Removing extra waive orders + while 0 < need < len(power.adjust): + if 'WAIVE' in power.adjust: + power.adjust.remove('WAIVE') + else: + break + + # Adding missing waive orders + if 'WAIVE' in power.adjust or power.is_dummy(): + power.adjust.extend(['WAIVE'] * (need - len(power.adjust))) + + def _update_adjust_orders(self, power, orders, expand=True, replace=True): + """ Updates order for Adjustment phase + :param power: The power instance submitting the orders + :param orders: The updated orders + :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.) + If false, expect orders in the following format. False gives a performance improvement. + :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders. + :return: List of processing errors + + Expected format: + A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI + A IRO R MAO, A IRO D, A LON B, F LIV B + """ + for who, adj in self._distribute_orders(power, orders): + self._add_adjust_orders(who, adj, expand=expand, replace=replace) + return self.error + + def _determine_orders(self): + """ Builds the self.orders dictionary (i.e. makes sure all orders are legitimate). """ + self.orders = {} + + # Determine the orders to be issued to each unit, based on unit ownership + for power in self.powers.values(): + for unit, order in power.orders.items(): + if power is self._unit_owner(unit): + self.orders[unit] = order + + # In NO_CHECK games, ensure that orders to other player's units are reported as invalid + # if no proxy was given + if 'NO_CHECK' in self.rules: + for power in self.powers.values(): + for unit, order in power.orders.items(): + if unit[0] not in 'RI' and power is not self._unit_owner(unit): + order = unit + ' ' + order + power.orders['INVALID %d' % len(power.orders)] = order + + def _default_orders(self, power): + """ Issues default orders for a power (HOLD) + :param power: The power instance + :return: Nothing + """ + # Power has no units + if not power.units: + return + + # Power has not submitted all his orders, checking if we default to HOLD + if not [x for x in power.units if self.orders.get(x)]: + power.civil_disorder = 1 + for unit in power.units: + self.orders.setdefault(unit, 'H') + + @classmethod + def _distribute_orders(cls, power, orders, clear=True): + """ For controlling powers, distribute orders to controlled powers + :param power: The power instance submitting the orders + :param orders: The list of orders submitted + :param clear: Boolean flag to indicate to clear order if NMR/CLEAR submitted + :return: A list of tuple with (controlled power instance, list of orders for that controlled power instance) + """ + powers, distributor = [power], {power.name: []} + cur_power = power + + # For each order submitted + for order in orders: + # Don't distribute blank order + word = order.strip().split() + if not word: + continue + + who = cur_power + + # Clearing orders for power + if (clear + and len(word) == 1 + and word[0][word[0][:1] in '([':len(word[0]) - (word[0][-1:] in '])')].upper() in ('NMR', 'CLEAR')): + distributor[who.name] = [] + # Otherwise, distributing order + else: + distributor[who.name] += [' '.join(word)] + + # Returning a list of tuples with the power instance and their respective orders. + return [(x, distributor[x.name]) for x in powers] + + # ==================================================================== + # Private Interface - ADJUDICATION Methods + # ==================================================================== + def _abuts(self, unit_type, unit_loc, order_type, other_loc): + """ Determines if a order for unit_type from unit_loc to other_loc is adjacent (Support and convoy only) + :param unit_type: The type of unit ('A' or 'F') + :param unit_loc: The location of the unit ('BUR', 'BUL/EC') + :param order_type: The type of order ('S' for Support, 'C' for Convoy', '-' for move) + :param other_loc: The location of the other unit + :return: 1 if the locations are adjacent for the move, 0 otherwise + """ + # Check if the map says the adjacency is good + if not self.map.abuts(unit_type, unit_loc, order_type, other_loc): + return 0 + return 1 + + def _unit_owner(self, unit, coast_required=1): + """ Finds the power who owns a unit + :param unit: The name of the unit to find (e.g. 'A PAR') + :param coast_required: Indicates that the coast is in the unit + (if 0, you can search for 'F STP' for example, but if 1, you must specify 'F STP/SC') + :return: The power instance who owns the unit or None + """ + # If coast_required is 0 and unit does not contain a '/' + # return the owner if we find a unit that starts with unit + # Don't count the unit if it needs to retreat (i.e. it has been dislodged) + for owner in self.powers.values(): + if unit in owner.units: + return owner + if not coast_required and '/' not in unit and [1 for x in owner.units if x.find(unit) == 0]: + return owner + return None + + def _occupant(self, site, any_coast=0): + """ Finds the occupant of a site + :param site: The site name (e.g. "STP") + :param any_coast: Boolean to indicate to return unit on any coast + :return: The unit (e.g. "A STP", "F STP/NC") occupying the site, None otherwise + """ + if any_coast: + site = site[:3] + for power in self.powers.values(): + for unit in power.units: + if unit[2:].startswith(site): + return unit + return None + + def _strengths(self): + """ This function sets self.combat to a dictionary of dictionaries, specifying each potential destination + for every piece, with the strengths of each unit's attempt to get (or stay) there, and with the givers + of supports that DON'T country dislodgement. (i.e. supports given by the power owning the occupying unit). + :return: Nothing, but sets self.combat + """ + # For example, the following orders, all by the same power: + # A MUN H, A SIL - MUN, A BOH S A SIL - MUN, A RUH - MUN would result in: + # e.g. { 'MUN': { 1 : [ ['A MUN', [] ], ['A RUH', [] ] ], 2 : [ ['A SIL', ['A BOH'] ] ] } } + # MUN is holding, being attack without support from RUH and being attacked with support from SIL (S from BOH) + self.combat = {} + + # For each order + for unit, order in self.command.items(): + word = order.split() + + # Strength of a non-move or failed move is 1 + support + if word[0] != '-' or self.result[unit]: + place, strength = unit[2:5], 1 + + # Strength of move depends on * and ~ in adjacency list + else: + offset = 1 if word[-1] == 'VIA' else 0 + place = word[-1 - offset][:3] + strength = 1 + + # Adds the list of supporting units + # Only adding the support that DOES NOT count toward dislodgment + self.combat \ + .setdefault(place, {}) \ + .setdefault(strength + self.supports[unit][0], []) \ + .append([unit, self.supports[unit][1]]) + + def _detect_paradox(self, starting_node, paradox_action, paradox_last_words): + """ Paradox detection algorithm. Start at starting node and move chain to see if node if performing + paradox action + :param starting_node: The location (e.g. PAR) where to start the paradox chain + :param paradox_action: The action that would cause a paradox in the chain (e.g. 'S') + :param paradox_last_words: The last words to detect in a order to cause a paradox (e.g. ['F', 'NTH']) + :return: Boolean (1 or 0) to indicate if a paradox action was detected in the chain + """ + visited_units = [] + current_node = starting_node + current_unit = self._occupant(current_node) + while current_unit is not None and current_unit not in visited_units: + visited_units += [current_unit] + current_order = self.command.get(current_unit, 'H') + + # Action and last words detected + if (current_order[0] == paradox_action + and current_order.split()[-1 * len(paradox_last_words):] == paradox_last_words): + return True + + # Continuing chain only if order is Support or Convoy + if current_order.split()[0] not in 'SC': + break + current_node = current_order.split()[-1] + current_unit = self._occupant(current_node) + + # No paradox detected + return False + + def _check_disruptions(self, may_convoy, result, coresult=None): + """ Determines convoy disruptions. + :param may_convoy: Contains the dictionary of all convoys that have a chance to succeed + (e.g. {'A PAR': ['BER', 'MUN']} + :param result: Result to set for the unit if the convoying fleet would be dislodged + (e.g. 'maybe', 'no convoy') + :param coresult: Result to set for the convoyer if the convoying fleet would be dislodged (e.g. 'dislodged') + :return: Nothing + """ + for unit, word in may_convoy.items(): + + # Removing '-' + word = [w for w in word if w != '-'] + + # Checking order of unit at dest + offset = 1 if self.command.get(unit, []).split()[-1] == 'VIA' else 0 + convoy_dest = self.command.get(unit, 'H').split()[-1 - offset] + unit_at_dest = self._occupant(convoy_dest) + order_unit_at_dest = self.command.get(unit_at_dest, 'H') + + # Looping over all areas where convoys will take place (including destination) + for place in word: + area, convoyer = place[:3], 'AF'[unit[0] == 'A'] + ' ' + place + strongest = self.combat[area][max(self.combat[area])] + + # Checking if the convoy is under attack + for strong_unit in strongest: + if self._unit_owner(convoyer) != self._unit_owner(strong_unit[0]): + break + else: + continue + + # Paradox Detection #1 + # [1st and 2nd order] Checking that we are not attacking a chain, with the last unit supporting + # the convoy + paradox = self._detect_paradox(convoy_dest, 'S', ['S', 'F', area]) + + # Checking if the convoy can withstand the attack and there is not active paradox + if convoyer in [x[0] for x in strongest] and not paradox: + continue + + # For a beleaguered garrison, checking if the destination is attacking / supporting an attack + # against convoy + if len(strongest) >= 2 and not paradox: + if order_unit_at_dest.split()[0] not in '-S' or order_unit_at_dest.split()[-1][:3] != area: + continue + + # Removing paths using place + self.convoy_paths.setdefault(unit, []) + for path in self.convoy_paths[unit]: + if place in path: + self.convoy_paths[unit].remove(path) + + # Paradox Detection #2 - Can convoyed unit use land route to cut support necessary to attack convoy + paradox = False + if self._abuts(unit[0], unit[2:], '-', convoy_dest): + paradox = self._detect_paradox(convoy_dest, 'S', ['-', area]) + + # Setting the result if there is no convoy paths left, and + # 1) there is no land route (or there is a paradox through the land route) + # or 2) the unit specified 'VIA' and doesn't want to try the land route (4.A.3) + if not self.convoy_paths[unit] and (paradox + or not self._abuts(unit[0], unit[2:], '-', convoy_dest) + or (self._abuts(unit[0], unit[2:], '-', convoy_dest) + and self.command[unit].split()[-1] == 'VIA')): + self.result[unit] = [result] + + # Setting the result for a would-be dislodged fleet + if coresult: + self.result[convoyer] = [coresult] + + def _boing(self, unit): + """ Mark a unit bounced, and update the combat table to show the unit as + having strength one at its current location + :param unit: The unit to bounce (e.g. 'A PAR') + :return: 1 + """ + self.result[unit] += ['bounce'] + self.combat \ + .setdefault(unit[2:5], {}) \ + .setdefault(1, []) \ + .append([unit, []]) + return 1 + + def _bounce(self): + """ This methods marks all units that can't get where they're going as bounced. + It loops to handle bounce-chains. + """ + # pylint: disable=too-many-nested-blocks + bounced = 1 + while bounced: + bounced = 0 + + # STEP 6. MARK (non-convoyed) PLACE-SWAP BOUNCERS + for unit, order in self.command.items(): + word = order.split() + if self.result[unit] or word[0] != '-' or self._is_moving_via_convoy(unit): + continue + crawl_ok, site = False, '- ' + unit[2:] + swap = self._occupant(word[1], any_coast=not crawl_ok) + if self._is_moving_via_convoy(swap): + continue + if not (crawl_ok and swap and swap[0] == unit[0] == 'F'): + site = site.split('/')[0] + if not (self.command.get(swap, '').find(site) or self.result[swap]): + my_strength = self.supports[unit][0] - len(self.supports[unit][1]) + his_strength = self.supports[swap][0] - len(self.supports[swap][1]) + our_strength = (self._unit_owner(unit) is self._unit_owner(swap) + or self.supports[unit][0] == self.supports[swap][0]) + if our_strength or my_strength <= his_strength: + self._boing(unit) + if our_strength or his_strength <= my_strength: + self._boing(swap) + + # Marking support used for self-dislodgement as void + for supporting_unit in self.supports[unit][1]: + self.result[supporting_unit] += ['void'] + for supporting_unit in self.supports[swap][1]: + self.result[supporting_unit] += ['void'] + bounced = 1 + if bounced: + continue + # No (more) swap-bouncers + + # STEP 7. MARK OUTGUNNED BOUNCERS + for place, conflicts in list(self.combat.items()): + strength = sorted(conflicts.keys()) + for key in strength: + if key != strength[-1] or len(conflicts[key]) != 1: + for unit, no_help in conflicts[key]: + if not self.result[unit] and self.command[unit][0] == '-': + bounced = self._boing(unit) + if bounced: + continue + # No (more) outgunned bouncers + + # STEP 8. MARK SELF-DISLODGE BOUNCERS + for place, conflicts in list(self.combat.items()): + strength = sorted(conflicts.keys()) + if len(conflicts[strength[-1]]) != 1: + continue + strongest = conflicts[strength[-1]][0][0] + if self.command[strongest][0] != '-' or self.result[strongest]: + continue + no_help = len(conflicts[strength[-1]][0][1]) + guy = self._occupant(place) + if guy: + owner = self._unit_owner(guy) + if ((self.command[guy][0] != '-' or self.result[guy]) + and (owner is self._unit_owner(strongest) + or (len(strength) > 1 and strength[-1] - no_help <= strength[-2]))): + bounced = self._boing(strongest) + for supporting_unit in conflicts[strength[-1]][0][1]: + if 'void' not in self.result[supporting_unit]: + self.result[supporting_unit] += ['void'] + + # No (more) self-dislodge bouncers + + def _cut_support(self, unit, direct=0): + """ See if the order made by the unit cuts a support. If so, cut it. + :param unit: The unit who is attacking (and cutting support) + :param direct: Boolean Flag - If set, the order must not only be a move, but also a non-convoyed move. + :return: Nothing + """ + order = self.command[unit] + word = order.split() + if word[0] != '-' or (direct and self._is_moving_via_convoy(unit)): + return + dest = word[-1] if word[-1] != 'VIA' else word[-2] + other_unit = self._occupant(dest, any_coast=1) + coord = self.command.get(other_unit, 'no unit at dest').split() + support_target = 'F ' + coord[-1][:3] + + # pylint: disable=too-many-boolean-expressions + if (coord[0] == 'S' + and 'cut' not in self.result[other_unit] + and 'void' not in self.result[other_unit] + + # EXCEPTION A: CANNOT CUT SUPPORT YOU YOURSELF ARE GIVING + and (self._unit_owner(unit) is not self._unit_owner(other_unit)) + + # EXCEPTION B: CANNOT CUT SUPPORT FOR A MOVE AGAINST YOUR LOCATION + and coord[-1][:3] != unit[2:5] + + # EXCEPTION C: OR (IF CONVOYED) FOR OR AGAINST ANY CONVOYING FLEET + and (not self._is_moving_via_convoy(unit) + or self.command.get(support_target, 'H')[0] != 'C' + or 'void' in self.result.get(support_target, []) + # EXCEPTION TO EXCEPTION C: IF THERE IS A ALTERNATIVE CONVOY ROUTE + or [1 for path in self.convoy_paths[unit] if support_target[2:] not in path])): + + # Okay, the support is cut. + self.result[other_unit] += ['cut'] + affected = ' '.join(coord[1:3]) # Unit being supported + self.supports[affected][0] -= 1 + if other_unit in self.supports[affected][1]: + self.supports[affected][1].remove(other_unit) + + def _no_effect(self, unit, site): + """ Removes a unit from the combat list of an attack site + :param unit: The unit attacking the site (e.g. ['A PAR', []]) + :param site: The site being attacked (e.g. 'MAR') + :return: Nothing + """ + sups = [strength for strength, attack_unit in self.combat[site].items() if unit in attack_unit][0] + self.combat[site][sups].remove(unit) + if not self.combat[site][sups]: + del self.combat[site][sups] + if not self.combat[site]: + del self.combat[site] + + def _unbounce(self, site): + """ Unbounce any powerful-enough move that can now take the spot being vacated by the dislodger. + :param site: The site being attacked + :return: Nothing + """ + # Detecting if there is only one attack winning at site + most = max(self.combat[site]) + if len(self.combat[site][most]) > 1: + return None + + # Unbouncing the winner of the attack at site + unbouncer = self.combat[site][most][0][0] + if 'bounce' in self.result[unbouncer]: + self.result[unbouncer].remove('bounce') + if unbouncer in self.dislodged: + del self.dislodged[unbouncer] + return self.result[unbouncer].remove('dislodged') + + next_site = unbouncer[2:5] + self._no_effect([unbouncer, []], next_site) + if next_site in self.combat: + self._unbounce(next_site) + return None + + def _resolve_moves(self): + """ Resolves the list of orders """ + # pylint: disable=too-many-statements,too-many-branches + + # ----------------------------------------------------------- + # STEP 0: DECLARE ALL RESULTS AS YET UNKNOWN + self.result, self.supports, self.convoy_paths, may_convoy = {}, {}, {}, {} + + # Fill self.command from the self.orders dictionary + # Fill self.ordered_units from the powers.units list + # Default order is to hold + self.command = {} + self.ordered_units = {} + for power in self.powers.values(): + self.ordered_units[power.name] = [unit for unit in power.units if unit in self.orders] + for unit in power.units: + self.command[unit] = self.orders.get(unit, 'H') + if 'NO_CHECK' in self.rules: + for order in [order for key, order in power.orders.items() if key.startswith('INVALID')]: + unit = ' '.join(order.split()[:2]) + self.ordered_units[power.name] += [unit] + self.command[unit] = 'H' + self.result[unit] = ['void'] + self._default_orders(power) + + for unit in self.command: + self.result.setdefault(unit, []) + self.supports.setdefault(unit, [0, []]) + + # ----------------------------------------------------------- + # STEP 1A. CANCEL ALL INVALID ORDERS GIVEN TO UNITS ATTEMPTING TO MOVE BY CONVOY + for unit, order in list(self.command.items()): + word = order.split() + if word[0] != '-': + continue + + # Full convoy path has been specified (e.g. 'A PAR - MAR - NAO - MAO - LON') + offset = 1 if word[-1] == 'VIA' else 0 + if len(word) - offset > 2: + for convoyer in range(1, len(word) - 1, 2): + convoy_order = self.command.get('AF'[unit[0] == 'A'] + ' ' + word[convoyer]) + if convoy_order not in ['C %s - ' % x + word[-1] for x in (unit, unit[2:])]: + if convoy_order: + self.result[unit] += ['no convoy'] + else: + self.command[unit] = 'H' + break + # List the valid convoys + else: + may_convoy[unit] = order.split() + self.convoy_paths[unit] = [[unit[2:]] + word[1::2]] + + # Only src and dest provided + else: + def flatten(nested_list): + """ Flattens a sublist """ + return [list_item for sublist in nested_list for list_item in sublist] + + has_via_convoy_flag = 1 if word[-1] == 'VIA' else 0 + convoying_units = self._get_convoying_units_for_path(unit[0], unit[2:], word[1]) + possible_paths = self._get_convoy_paths(unit[0], + unit[2:], + word[1], + has_via_convoy_flag, + convoying_units) + + # No convoy path - Removing VIA and checking if adjacent + if not possible_paths: + if has_via_convoy_flag: + self.command[unit] = ' '.join(word[:-1]) + if not self._abuts(unit[0], unit[2:], 'S', word[1]): + self.result[unit] += ['no convoy'] + + # There is a convoy path, remembering the convoyers + else: + self.convoy_paths[unit] = possible_paths + may_convoy.setdefault(unit, []) + for convoyer in convoying_units: + if convoyer[2:] in flatten(possible_paths) and convoyer[2:] not in may_convoy[unit]: + may_convoy[unit] += [convoyer[2:]] + + # Marking all convoys that are not in any path + invalid_convoys = convoying_units[:] + all_path_locs = list(set(flatten(possible_paths))) + for convoy in convoying_units: + if convoy[2:] in all_path_locs: + invalid_convoys.remove(convoy) + for convoy in invalid_convoys: + self.result[convoy] = ['no convoy'] + + # ----------------------------------------------------------- + # STEP 1B. CANCEL ALL INVALID CONVOY ORDERS + for unit, order in self.command.items(): + if order[0] != 'C': + continue + # word = ['C', 'PAR', 'MAR'] -> ['C', 'A', 'PAR', 'MAR'] + word, mover_type = order.split(), 'AF'[unit[0] == 'A'] + if word[1] != mover_type: + word[1:1] = [mover_type] + mover = '%s %s' % (mover_type, word[2]) + if self._unit_owner(mover): + convoyer = may_convoy.get(mover, []) + offset = 1 if self.command.get(mover, '').split()[-1] == 'VIA' else 0 + mover_dest = self.command.get(mover, '').split()[-1 - offset] + if unit[2:] not in convoyer or word[-1] != mover_dest: + self.result[unit] += ['void'] + else: + self.command[unit] = 'H' + + # ----------------------------------------------------------- + # STEP 2. CANCEL INCONSISTENT SUPPORT ORDERS AND COUNT OTHERS + for unit, order in self.command.items(): + if order[0] != 'S': + continue + word, signal = order.split(), 0 + + # Remove any trailing "H" from a support-in-place order. + if word[-1] == 'H': + del word[-1] + self.command[unit] = ' '.join(word) + + # Stick the proper unit type (A or F) into the order; + # All supports will have it from here on + where = 1 + (word[1] in 'AF') + guy = self._occupant(word[where]) + + # See if there is a unit to receive the support + if not guy: + self.command[unit] = 'H' + if not signal: + self.result[unit] += ['void'] + continue + word[1:where + 1] = guy.split() + self.command[unit] = ' '.join(word) + + # See if the unit's order matches the supported order + if signal: + continue + coord = self.command[guy].split() + + # 1) Void if support is for hold and guy is moving + # 2) Void if support is for move and guy isn't going where support is given + # 3) Void if support is give, but move over convoy failed + offset = 1 if coord[-1] == 'VIA' else 0 + if ((len(word) < 5 and coord[0] == '-') + or (len(word) > 4 and (coord[0], coord[-1 - offset]) != ('-', word[4])) + or 'no convoy' in self.result[guy]): + self.result[unit] += ['void'] + continue + + # Okay, the support is valid + self.supports[guy][0] += 1 + + # If the unit is owned by the owner of the piece being attacked, add the unit to those + # whose supports are not counted toward dislodgment. + if coord[0] != '-': + continue + owner = self._unit_owner(unit) + other = self._unit_owner(self._occupant(coord[-1], any_coast=1)) + if owner is other: + self.supports[guy][1] += [unit] + + # ----------------------------------------------------------- + # STEP 3. LET DIRECT (NON-CONVOYED) ATTACKS CUT SUPPORTS + for unit in self.command: + if not self.result[unit]: + self._cut_support(unit, direct=1) + + # ----------------------------------------------------------- + # STEPS 4 AND 5. DETERMINE CONVOY DISRUPTIONS + cut, cutters = 1, [] + while cut: + cut = 0 + self._strengths() + + # STEP 4. CUT SUPPORTS MADE BY (non-maybe) CONVOYED ATTACKS + self._check_disruptions(may_convoy, 'maybe') + for unit in may_convoy: + if self.result[unit] or unit in cutters: + continue + self._cut_support(unit) + cutters += [unit] + cut = 1 + if cut: + continue + + # STEP 5. LOCATE NOW-DEFINITE CONVOY DISRUPTIONS, VOID SUPPORTS + # THESE CONVOYERS WERE GIVEN, AND ALLOW CONVOYING UNITS TO CUT SUPPORT + self._check_disruptions(may_convoy, 'no convoy', 'disrupted') + for unit in may_convoy: + if 'no convoy' in self.result[unit]: + for sup, help_unit in self.command.items(): + if not (help_unit.find('S %s' % unit) or self.result[sup]): + self.result[sup] = ['no convoy'] + if not (help_unit.find('C %s' % unit) or self.result[sup]): + self.result[sup] = ['no convoy'] + self.supports[unit] = [0, []] + elif 'maybe' in self.result[unit] and unit not in cutters: + self.result[unit], cut = [], 1 + self._cut_support(unit) + cutters += [unit] + + # Recalculate strengths now that some are reduced by cuts + self._strengths() + + # Mark bounces, then dislodges, and if any dislodges caused a cut + # loop over this whole kaboodle again + self.dislodged, cut = {}, 1 + while cut: # pylint: disable=too-many-nested-blocks + # ----------------------------------------------------------- + # STEPS 6-8. MARK BOUNCERS + self._bounce() + + # STEP 9. MARK SUPPORTS CUT BY DISLODGES + cut = 0 + for unit, order in self.command.items(): + if order[0] != '-' or self.result[unit]: + continue + attack_order = order.split() + offset = 1 if attack_order[-1] == 'VIA' else 0 + victim = self._occupant(attack_order[-1 - offset], any_coast=1) + if victim and self.command[victim][0] == 'S' and not self.result[victim]: + word = self.command[victim].split() + supported, sup_site = self._occupant(word[2]), word[-1][:3] + + # This next line is the key. Convoyed attacks can dislodge, but even when doing so, they cannot cut + # supports offered for or against a convoying fleet + # (They can cut supports directed against the original position of the army, though.) + if len(attack_order) > 2 and sup_site != unit[2:5]: + continue + self.result[victim] += ['cut'] + cut = 1 + for sups in self.combat.get(sup_site, {}): + for guy, no_help in self.combat[sup_site][sups]: + if guy != supported: + continue + self.combat[sup_site][sups].remove([guy, no_help]) + if not self.combat[sup_site][sups]: + del self.combat[sup_site][sups] + sups -= 1 + if victim in no_help: + no_help.remove(victim) + self.combat[sup_site].setdefault(sups, []).append([guy, no_help]) + break + else: + continue + break + + # ----------------------------------------------------------- + # STEP 10. MARK DISLODGEMENTS AND UNBOUNCE ALL MOVES THAT LEAD TO DISLODGING UNITS + for unit, order in self.command.items(): + if order[0] != '-' or self.result[unit]: + continue + site = unit[2:5] + offset = 1 if order.split()[-1] == 'VIA' else 0 + loser = self._occupant(order.split()[-1 - offset], any_coast=1) + if loser and (self.command[loser][0] != '-' or self.result[loser]): + self.result[loser] = [res for res in self.result[loser] if res != 'disrupted'] + ['dislodged'] + self.dislodged[loser] = site + + # Check for a dislodged swapper (attacker and dislodged units must not be convoyed.) + # If found, remove the swapper from the combat list of the attacker's space + head_to_head_battle = not self._is_moving_via_convoy(unit) and not self._is_moving_via_convoy(loser) + if self.command[loser][2:5] == site and head_to_head_battle: + for sups, items in self.combat.get(site, {}).items(): + item = [x for x in items if x[0] == loser] + if item: + self._no_effect(item[0], site) + break + + # Marking support for self-dislodgement as void + for supporting_unit in self.supports[unit][1]: + self.result[supporting_unit] += ['void'] + + # Unbounce any powerful-enough move that can now take the spot being vacated by the dislodger. + if site in self.combat: + self._unbounce(site) + + # Done :-) + + def _move_results(self): + """ Resolves moves (Movement phase) and returns a list of messages explaining what happened + :return: A list of lines for the results file explaining what happened during the phase + """ + # Resolving moves + self._resolve_moves() + + # Determine any retreats + for power in self.powers.values(): + for unit in [u for u in power.units if u in self.dislodged]: + if unit not in power.retreats: + self.update_hash(power.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True) + power.retreats.setdefault(unit, []) + attacker_site, site = self.dislodged[unit], unit[2:] + attacker = self._occupant(attacker_site) + if self.map.loc_abut.get(site): + pushee = site + else: + pushee = site.lower() + for abut in self.map.loc_abut[pushee]: + abut = abut.upper() + where = abut[:3] + if ((self._abuts(unit[0], site, '-', abut) or self._abuts(unit[0], site, '-', where)) + and (not self.combat.get(where) + and where != attacker_site or self._is_moving_via_convoy(attacker))): + + # Armies cannot retreat to specific coasts + if unit[0] == 'F': + power.retreats[unit] += [abut] + elif where not in power.retreats[unit]: + power.retreats[unit] += [where] + + # List all possible retreats + destroyed, self.popped = {}, [] + if self.dislodged: + for power in self.powers.values(): + for unit in [u for u in power.units if u in self.dislodged]: + + # Removing unit + self.update_hash(power.name, unit_type=unit[0], loc=unit[2:]) + power.units.remove(unit) + to_where = power.retreats.get(unit) + + # Describing what it can do + if to_where: + pass + else: + destroyed[unit] = power + self.popped += [unit] + + # Now (finally) actually move the units that succeeded in moving + for power in self.powers.values(): + for unit in power.units[:]: + if self.command[unit][0] == '-' and not self.result[unit]: + offset = 1 if self.command[unit].split()[-1] == 'VIA' else 0 + + # Removing + self.update_hash(power.name, unit_type=unit[0], loc=unit[2:]) + power.units.remove(unit) + + # Adding + new_unit = unit[:2] + self.command[unit].split()[-1 - offset] + self.update_hash(power.name, unit_type=new_unit[0], loc=new_unit[2:]) + power.units += [new_unit] + + # Setting influence + for influence_power in self.powers.values(): + if new_unit[2:5] in influence_power.influence: + influence_power.influence.remove(new_unit[2:5]) + power.influence.append(new_unit[2:5]) + + # If units were destroyed, other units may go out of sight + if destroyed: + for unit, power in destroyed.items(): + if unit in power.retreats: + self.update_hash(power.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True) + del power.retreats[unit] + + # All finished + self._post_move_update() + return [] + + def _other_results(self): + """ Resolves moves (Retreat and Adjustment phase) and returns a list of messages explaining what happened + :return: A list of lines for the results file explaining what happened during the phase + """ + # pylint: disable=too-many-statements,too-many-branches,too-many-nested-blocks + self.command = {} + self.ordered_units = {} + conflicts = {} + + # Adjustments + if self.phase_type == 'A': + self.result = {} + + # Emptying the results for the Adjustments Phase + for power in self.powers.values(): + self.ordered_units.setdefault(power.name, []) + for order in power.adjust[:]: + + # Void order - Marking it as such in results + if order.split()[0] == 'VOID': + word = order.split()[1:] + unit = ' '.join(word[:2]) + self.result.setdefault(unit, []).append('void') + power.adjust.remove(order) + if unit not in self.ordered_units[power.name]: + self.ordered_units[power.name] += [unit] + + # Valid order - Marking as unprocessed + else: + word = order.split() + unit = ' '.join(word[:2]) + self.result.setdefault(unit, []) + if unit not in self.ordered_units[power.name]: + self.ordered_units[power.name] += [unit] + + # CIVIL DISORDER + for power in self.powers.values(): + diff = len(power.units) - len(power.centers) + + # Detecting missing orders + for order in power.adjust[:]: + if diff == 0: + word = order.split() + unit = ' '.join(word[:2]) + self.result.setdefault(unit, []).append('void') + power.adjust.remove(order) + + # Looking for builds + elif diff < 0: + word = order.split() + unit = ' '.join(word[:2]) + if word[-1] == 'B': + diff += 1 + else: + self.result.setdefault(unit, []).append('void') + power.adjust.remove(order) + + # Looking for removes + else: + word = order.split() + unit = ' '.join(word[:2]) + if word[-1] == 'D': + diff -= 1 + else: + self.result.setdefault(unit, []).append('void') + power.adjust.remove(order) + + if not diff: + continue + + power.civil_disorder = 1 + + # Need to remove units + if diff > 0: + fleets = PriorityDict() + armies = PriorityDict() + + # Calculating distance to home + for unit in power.units: + distance = self._get_distance_to_home(unit[0], unit[2:], power.homes) + if unit[0] == 'F': + fleets[unit] = -1 * distance + else: + armies[unit] = -1 * distance + + # Removing units + for unit in range(diff): + goner_distance, goner = 99999, None + + # Removing units with largest distance (using fleets if they are equal) + # (using alpha name if multiple units) + if fleets: + goner_distance, goner = fleets.smallest() + if armies and armies.smallest()[0] < goner_distance: + goner_distance, goner = armies.smallest() + if goner is None: + break + if goner[0] == 'F': + del fleets[goner] + else: + del armies[goner] + power.adjust += ['%s D' % goner] + self.result.setdefault(goner, []) + + # Need to build units + else: + sites = self._build_sites(power) + need = min(self._build_limit(power, sites), -diff) + power.adjust += ['WAIVE'] * need + + # Retreats phase + elif self.phase_type == 'R': + self.result = {} + + # Emptying the results for the Retreats Phase + for power in self.powers.values(): + self.ordered_units.setdefault(power.name, []) + for retreats in power.retreats: + self.result[retreats] = [] + + # Emptying void orders - And marking them as such + for power in self.powers.values(): + for order in power.adjust[:]: + if order.split()[0] == 'VOID': + word = order.split()[1:] + unit = ' '.join(word[:2]) + self.result[unit] = ['void'] + if unit not in self.ordered_units[power.name]: + self.ordered_units[power.name] += [unit] + power.adjust.remove(order) + + # Disband units with no retreats + for power in self.powers.values(): + if power.retreats and not power.adjust: + power.civil_disorder = 1 + power.adjust = ['%s D' % r_unit for r_unit in power.retreats] + + # Determine multiple retreats to the same location. + for power in self.powers.values(): + for order in power.adjust or []: + word = order.split() + if len(word) == 4: + conflicts.setdefault(word[3][:3], []).append(' '.join(word[:2])) + + # Determine retreat conflict (*bounce, destroyed*) + # When finished, "self.popped" will be a list of all retreaters who didn't make it. + for retreaters in conflicts.values(): + if len(retreaters) > 1: + for retreater in retreaters: + if 'void' in self.result[retreater]: + self.result[retreater].remove('void') + self.result[retreater] += ['bounce', 'disband'] + self.popped += retreaters + + # Processing Build and Disband + for power in self.powers.values(): + diff = len(power.units) - len(power.centers) + + # For each order + for order in power.adjust or []: + word = order.split() + unit = ' '.join(word[:2]) + + # Build + if word[-1] == 'B' and len(word) > 2: + if diff < 0: + self.update_hash(power.name, unit_type=unit[0], loc=unit[2:]) + power.units += [' '.join(word[:2])] + diff += 1 + self.result[unit] += [''] + else: + self.result[unit] += ['void'] + if unit not in self.ordered_units[power.name]: + self.ordered_units[power.name] += [unit] + + # Disband + elif word[-1] == 'D' and self.phase_type == 'A': + if diff > 0 and ' '.join(word[:2]) in power.units: + self.update_hash(power.name, unit_type=unit[0], loc=unit[2:]) + power.units.remove(' '.join(word[:2])) + diff -= 1 + self.result[unit] += [''] + else: + self.result[unit] += ['void'] + if unit not in self.ordered_units[power.name]: + self.ordered_units[power.name] += [unit] + + # Retreat + elif len(word) == 4: + if unit not in self.popped: + self.update_hash(power.name, unit_type=word[0], loc=word[-1]) + power.units += [word[0] + ' ' + word[-1]] + if unit in self.dislodged: + del self.dislodged[unit] + + # Update influence + for influence_power in self.powers.values(): + if word[-1] in influence_power.influence: + influence_power.influence.remove(word[-1]) + power.influence.append(word[-1]) + + if unit not in self.ordered_units[power.name]: + self.ordered_units[power.name] += [unit] + + for dis_unit in power.retreats: + self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True) + power.adjust, power.retreats, power.civil_disorder = [], {}, 0 + + # Disbanding + for unit in [u for u in self.dislodged]: + self.result.setdefault(unit, []) + if 'disband' not in self.result[unit]: + self.result[unit] += ['disband'] + del self.dislodged[unit] + if unit not in self.popped: + self.popped += [unit] + + return [] + + def _resolve(self): + """ Resolve the current phase + :return: A list of strings for the results file explaining how the phase was resolved. + """ + this_phase = self.phase_type + + # This method knows how to process movement, retreat, and adjustment phases. + # For others, implement resolve_phase() + if this_phase == 'M': + self._move_results() + elif this_phase in 'RA': + self._other_results() + self._advance_phase() + + def _clear_history(self): + """ Clear all game history fields. """ + self.state_history.clear() + self.order_history.clear() + self.result_history.clear() + self.message_history.clear() + self.clear_orders() + self.clear_vote() diff --git a/diplomacy/engine/map.py b/diplomacy/engine/map.py new file mode 100644 index 0000000..677a4e7 --- /dev/null +++ b/diplomacy/engine/map.py @@ -0,0 +1,1361 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +# -*- coding: utf-8 -*- +# pylint: disable=too-many-lines +""" Map + - Contains the map object which represents a map where the game can be played +""" +from copy import deepcopy +import os +from diplomacy import settings +from diplomacy.utils import KEYWORDS, ALIASES +import diplomacy.utils.errors as err + +# Constants +UNDETERMINED, POWER, UNIT, LOCATION, COAST, ORDER, MOVE_SEP, OTHER = 0, 1, 2, 3, 4, 5, 6, 7 +MAP_CACHE = {} + + +class Map(): + """ MAP Class + + Properties: + - abbrev: Contains the power abbreviation, otherwise defaults to first letter of PowerName + e.g. {'ENGLISH': 'E'} + - abuts_cache: Contains a cache of abuts for ['A,'F'] between all locations for orders ['S', 'C', '-'] + e.g. {(A, PAR, -, MAR): 1, ...} + - aliases: Contains a dict of all the aliases (e.g. full province name to 3 char) + e.g. {'EAST': 'EAS', 'STP ( /SC )': 'STP/SC', 'FRENCH': 'FRANCE', 'BUDAPEST': 'BUD', 'NOR': 'NWY', ... } + - centers: Contains a dict of currently owned supply centers for each player + e.g. {'RUSSIA': ['MOS', 'SEV', 'STP', 'WAR'], 'FRANCE': ['BRE', 'MAR', 'PAR'], ... } + - convoy_paths: Contains a list of all possible convoys paths bucketed by number of fleets + format: {nb of fleets: [(START_LOC, {FLEET LOC}, {DEST LOCS})]} + - dummies: Indicates the list of powers that are dummies + e.g. ['FRANCE', 'ITALY'] + - error: Contains a list of errors that the map generated + e.g. [''DUPLICATE MAP ALIAS OR POWER: JAPAN'] + - files: Contains a list of files that were loaded (e.g. USES keyword) + e.g. ['standard.map', 'standard.politics', 'standard.geography', 'standard.military'] + - first_year: Indicates the year where the game is starting. + e.g. 1901 + - flow: List that contains the seasons with the phases + e.g. ['SPRING:MOVEMENT,RETREATS', 'FALL:MOVEMENT,RETREATS', 'WINTER:ADJUSTMENTS'] + - flow_sign: Indicate the direction of flow (1 is positive, -1 is negative) + e.g. 1 + - homes: Contains the list of supply centers where units can be built (i.e. assigned at the beginning) + e.g. {'RUSSIA': ['MOS', 'SEV', 'STP', 'WAR'], 'FRANCE': ['BRE', 'MAR', 'PAR'], ... } + - inhabits: List that indicates which power have a INHABITS, HOME, or HOMES line + e.g. ['FRANCE'] + - keywords: Contains a dict of keywords to parse status files and orders + e.g. {'BUILDS': 'B', '>': '', 'SC': '/SC', 'REMOVING': 'D', 'WAIVED': 'V', 'ATTACK': '', ... } + - loc_abut: Contains a adjacency list for each province + e.g. {'LVP': ['CLY', 'edi', 'IRI', 'NAO', 'WAL', 'yor'], ...} + - loc_coasts: Contains a mapping of all coasts for every location + e.g. {'PAR': ['PAR'], 'BUL': ['BUL', 'BUL/EC', 'BUL/SC'], ... } + - loc_name: Dict that indicates the 3 letter name of each location + e.g. {'GULF OF LYON': 'LYO', 'BREST': 'BRE', 'BUDAPEST': 'BUD', 'RUHR': 'RUH', ... } + - loc_type: Dict that indicates if each location is 'WATER', 'COAST', 'LAND', or 'PORT' + e.g. {'MAO': 'WATER', 'SER': 'LAND', 'SYR': 'COAST', 'MOS': 'LAND', 'VEN': 'COAST', ... } + - locs: List of 3 letter locations (With coasts) + e.g. ['ADR', 'AEG', 'ALB', 'ANK', 'APU', 'ARM', 'BAL', 'BAR', 'BEL', 'BER', ... ] + - name: Name of the map + e.g. 'standard' + - own_word: Dict to indicate the word used to refer to people living in each power's country + e.g. {'RUSSIA': 'RUSSIAN', 'FRANCE': 'FRENCH', 'UNOWNED': 'UNOWNED', 'TURKEY': 'TURKISH', ... } + - owns: List that indicates which power have a OWNS or CENTERS line + e.g. ['FRANCE'] + - phase: String to indicate the beginning phase of the map + e.g. 'SPRING 1901 MOVEMENT' + - phase_abbrev: Dict to indicate the 1 letter abbreviation for each phase + e.g. {'A': 'ADJUSTMENTS', 'M': 'MOVEMENT', 'R': 'RETREATS'} + - pow_name: Dict to indicate the power's name + e.g. {'RUSSIA': 'RUSSIA', 'FRANCE': 'FRANCE', 'TURKEY': 'TURKEY', 'GERMANY': 'GERMANY', ... } + - powers: Contains the list of powers (players) in the game + e.g. ['AUSTRIA', 'ENGLAND', 'FRANCE', 'GERMANY', 'ITALY', 'RUSSIA', 'TURKEY'] + - root_map: Contains the name of the original map file loaded (before the USES keyword are applied) + A map that is called with MAP is the root_map + e.g. 'standard' + - rules: Contains a list of rules used by all variants (for display only) + e.g. ['RULE_1'] + - scs: Contains a list of all the supply centers in the game + e.g. ['MOS', 'SEV', 'STP', 'WAR', 'BRE', 'MAR', 'PAR', 'BEL', 'BUL', 'DEN', 'GRE', 'HOL', 'NWY', ... ] + - seq: []Â Contains the sequence of seasons in format 'SEASON_NAME SEASON_TYPE' + e.g. ['NEWYEAR', 'SPRING MOVEMENT', 'SPRING RETREATS', 'FALL MOVEMENT', 'FALL RETREATS', + 'WINTER ADJUSTMENTS'] + - unclear: Contains the alias for ambiguous places + e.g. {'EAST': 'EAS'} + - unit_names: {} Contains a dict of the unit names + e.g. {'F': 'FLEET', 'A': 'ARMY'} + - units: Dict that contains the current position of each unit by power + e.g. {'FRANCE': ['F BRE', 'A MAR', 'A PAR'], 'RUSSIA': ['A WAR', 'A MOS', 'F SEV', 'F STP/SC'], ... } + - validated: Boolean to indicate if the map file has been validated + e.g. 1 + - victory: Indicates the number of supply centers to win the game (>50% required if None) + e.g. 18 + """ + # pylint: disable=too-many-instance-attributes + + __slots__ = ['name', 'first_year', 'victory', 'phase', 'validated', 'flow_sign', 'root_map', 'abuts_cache', + 'homes', 'loc_name', 'loc_type', 'loc_abut', 'loc_coasts', 'own_word', 'abbrev', 'centers', 'units', + 'pow_name', 'rules', 'files', 'powers', 'scs', 'owns', 'inhabits', 'flow', 'dummies', 'locs', 'error', + 'seq', 'phase_abbrev', 'unclear', 'unit_names', 'keywords', 'aliases', 'convoy_paths'] + + def __new__(cls, name='standard', use_cache=True): + """ New function - Retrieving object from cache if possible + :param name: Name of the map to load + :param use_cache: Boolean flag to indicate we want a blank object that doesn't use cache + """ + if name in MAP_CACHE and use_cache: + return MAP_CACHE[name] + return object.__new__(cls) + + def __init__(self, name='standard', use_cache=True): + """ Constructor function + :param name: Name of the map to load + :param use_cache: Boolean flag to indicate we want a blank object that doesn't use cache + """ + if name in MAP_CACHE: + return + self.name = name + self.first_year = 1901 + self.victory = self.phase = self.validated = self.flow_sign = None + self.root_map = None + self.abuts_cache = {} + self.homes, self.loc_name, self.loc_type, self.loc_abut, self.loc_coasts = {}, {}, {}, {}, {} + self.own_word, self.abbrev, self.centers, self.units, self.pow_name = {}, {}, {}, {}, {} + self.rules, self.files, self.powers, self.scs, self.owns, self.inhabits = [], [], [], [], [], [] + self.flow, self.dummies, self.locs = [], [], [] + self.error, self.seq = [], [] + self.phase_abbrev, self.unclear = {}, {} + self.unit_names = {'A': 'ARMY', 'F': 'FLEET'} + self.keywords, self.aliases = KEYWORDS.copy(), ALIASES.copy() + self.load() + self.build_cache() + self.validate() + if name not in CONVOYS_PATH_CACHE and use_cache: + CONVOYS_PATH_CACHE[name] = add_to_cache(name) + self.convoy_paths = CONVOYS_PATH_CACHE.get(name, {}) + if use_cache: + MAP_CACHE[name] = self + + def __deepcopy__(self, memo): + """ Fast deep copy implementation """ + actual_init = self.__class__.__init__ + self.__class__.__init__ = lambda *args, **kwargs: None + instance = self.__class__(name=self.name, use_cache=False) + self.__class__.__init__ = actual_init + for key in self.__slots__: + setattr(instance, key, deepcopy(getattr(self, key))) + return instance + + def __str__(self): + return self.name + + def validate(self, force=0): + """ Validate that the configuration from a map file is correct + :param force: Indicate that we want to force a validation, even if the map is already validated + :return: Nothing + """ + # pylint: disable=too-many-branches + # Already validated, returning (except if forced or if validating phases) + if not force and self.validated: + return + self.validated = 1 + + # Root map + self.root_map = self.root_map or self.name + + # Validating powers + self.powers = [power_name for power_name in self.homes if power_name != 'UNOWNED'] + self.powers.sort() + if len(self.powers) < 2: + self.error += [err.MAP_LEAST_TWO_POWERS] + + # Validating area type + for place in self.loc_name.values(): + if place.upper() not in self.powers and not self.area_type(place): + self.error += [err.MAP_LOC_NOT_FOUND % place] + + # Validating adjacencies + for place, abuts in self.loc_abut.items(): + up_abuts = [loc.upper() for loc in abuts] + for abut in abuts: + up_abut = abut.upper() + if up_abuts.count(up_abut) > 1: + self.error += [err.MAP_SITE_ABUTS_TWICE % (place.upper(), up_abut)] + while up_abut in up_abuts: + up_abuts.remove(up_abut) + + # Checking full name + if place.upper() not in self.loc_name.values(): + self.error += [err.MAP_NO_FULL_NAME % place] + + # Checking one-way adjacency + for loc in abuts: + if self.area_type(place) != 'SHUT' \ + and self.area_type(loc) != 'SHUT' \ + and not self.abuts('A', loc, '-', place) \ + and not self.abuts('F', loc, '-', place): + self.error.append(err.MAP_ONE_WAY_ADJ % (place, loc)) + + # Validating home centers + for power_name, places in self.homes.items(): + for site in places: + # Adding home as supply center + if site not in self.scs: + self.scs += [site] + if not self.area_type(site): + self.error += [err.MAP_BAD_HOME % (power_name, site)] + + # Remove home centers from unowned list. + # It's perfectly OK for 2 powers to share a home center, as long + # as no more than one owns it at the same time. + if power_name != 'UNOWNED': + if site in self.homes['UNOWNED']: + self.homes['UNOWNED'].remove(site) + + # Valid supply centers + for scs in self.centers.values(): + self.scs.extend([center for center in scs if center not in self.scs]) + + # Validating initial centers and units + for power_name, places in self.centers.items(): + for loc in places: + if not self.area_type(loc): + self.error.append(err.MAP_BAD_INITIAL_OWN_CENTER % (power_name, loc)) + + # Checking if power has OWN line + for power_name in self.powers: + if power_name not in self.owns: + self.centers[power_name] = self.homes[power_name][:] + for unit in self.units.get(power_name, []): + if not self.is_valid_unit(unit): + self.error.append(err.MAP_BAD_INITIAL_UNITS % (power_name, unit)) + + # Checking for multiple owners + for power_name, centers in self.centers.items(): + for site in centers: + for other, locs in self.centers.items(): + if other == power_name and locs.count(site) != 1: + self.error += [err.MAP_CENTER_MULT_OWNED % site] + elif other != power_name and locs.count(site) != 0: + self.error += [err.MAP_CENTER_MULT_OWNED % site] + if 'UNOWNED' in self.homes: + del self.homes['UNOWNED'] + + # Ensure a default game-year FLOW + self.flow = ['SPRING:MOVEMENT,RETREATS', 'FALL:MOVEMENT,RETREATS', 'WINTER:ADJUSTMENTS'] + self.flow_sign = 1 + self.seq = ['NEWYEAR', 'SPRING MOVEMENT', 'SPRING RETREATS', 'FALL MOVEMENT', 'FALL RETREATS', + 'WINTER ADJUSTMENTS'] + self.phase_abbrev = {'M': 'MOVEMENT', 'R': 'RETREATS', 'A': 'ADJUSTMENTS'} + + # Validating initial game phase + self.phase = self.phase or 'SPRING 1901 MOVEMENT' + phase = self.phase.split() + if len(phase) != 3: + self.error += [err.MAP_BAD_PHASE % self.phase] + else: + self.first_year = int(phase[1]) + + def load(self, file_name=None): + """ Loads a map file from disk + :param file_name: Optional. A string representing the file to open. Otherwise, defaults to the map name + :return: Nothing + """ + # pylint: disable=too-many-nested-blocks,too-many-statements,too-many-branches + # If file_name is string, opening file from disk + # Otherwise file_name is the file handler + power = 0 + if file_name is None: + file_name = '{}.map'.format(self.name) + file_path = os.path.join(settings.PACKAGE_DIR, 'maps', file_name) + + # Checking if file exists: + found_map = 1 if os.path.exists(file_path) else 0 + if not found_map: + self.error.append(err.MAP_FILE_NOT_FOUND % file_name) + return + + # Adding to parsed files + self.files += [file_name] + + # Parsing file + with open(file_path, encoding='utf-8') as file: + variant = 0 + + for line in file: + word = line.split() + + # -- # comment... + if not word or word[0][0] == '#': + continue + upword = word[0].upper() + + # ---------------------------------- + # Centers needed to obtain a VICTORY + # -- VICTORY centerCount... + if upword == 'VICTORY': + try: + self.victory = [int(word) for word in word[1:]] + except ValueError: + self.error += [err.MAP_BAD_VICTORY_LINE] + + # --------------------------------- + # Inclusion of other base map files + # -- USE[S] fileName... + # -- MAP mapName + elif upword in ('USE', 'USES', 'MAP'): + if upword == 'MAP': + if len(word) != 2: + self.error += [err.MAP_BAD_ROOT_MAP_LINE] + elif self.root_map: + self.error += [err.MAP_TWO_ROOT_MAPS] + else: + self.root_map = word[1].split('.')[0] + for new_file in word[1:]: + if '.' not in new_file: + new_file = '{}.map'.format(new_file) + if new_file not in self.files: + self.load(new_file) + else: + self.error += [err.MAP_FILE_MULT_USED % new_file] + + # ------------------------------------ + # Set BEGIN phase + # -- BEGIN season year phaseType + elif upword == 'BEGIN': + self.phase = ' '.join(word[1:]).upper() + + # ------------------------------------ + # RULEs specific to this map + elif upword in ('RULE', 'RULES'): + if (variant or 'ALL') == 'ALL': + self.rules += line.upper().split()[1:] + + # ------------------------------------ + # -- [oldAbbrev ->] placeName = abbreviation alias... + elif '=' in line: + token = line.upper().split('=') + if len(token) == 1: + self.error += [err.MAP_BAD_ALIASES_IN_FILE % token[0]] + token += [''] + old_name, name, word = 0, token[0].strip(), token[1].split() + parts = [part.strip() for part in name.split('->')] + if len(parts) == 2: + old_name, name = parts + elif len(parts) > 2: + self.error += [err.MAP_BAD_RENAME_DIRECTIVE % name] + if not (word[0][0] + word[0][-1]).isalnum() or word[0] != self.norm(word[0]).replace(' ', ''): + self.error += [err.MAP_INVALID_LOC_ABBREV % word[0]] + + # Rename no longer supported + # Making sure place not already there + if old_name: + self.error += [err.MAP_RENAME_NOT_SUPPORTED] + if name in self.keywords: + self.error += [err.MAP_LOC_RESERVED_KEYWORD % name] + normed = name + else: + normed = self.norm(name) + if name in self.loc_name or normed in self.aliases: + self.error += [err.MAP_DUP_LOC_OR_POWER % name] + self.loc_name[name] = self.aliases[normed] = word[0] + + # Ambiguous place names end with a ? + for alias in word[1:]: + unclear = alias[-1] == '?' + # For ambiguous place names, let's do just a minimal normalization + # otherwise they might become unrecognizable (e.g. "THE") + normed = alias[:-1].replace('+', ' ').upper() if unclear else self.norm(alias) + if unclear: + self.unclear[normed] = word[0] + elif normed in self.aliases: + if self.aliases[normed] != word[0]: + self.error += [err.MAP_DUP_ALIAS_OR_POWER % alias] + else: + self.aliases[normed] = word[0] + + # ------------------------------------ + # Center ownership (!= Home Ownership) + # -- OWNS center... + # -- CENTERS [center...] + elif upword in ('OWNS', 'CENTERS'): + if not power: + self.error += [err.MAP_OWNS_BEFORE_POWER % (upword, ' '.join(word))] + else: + if power not in self.owns: + self.owns.append(power) + # CENTERS resets the list of centers, OWNS only appends + if upword[0] == 'C' or power not in self.centers: + self.centers[power] = line.upper().split()[1:] + else: + self.centers[power].extend( + [center for center in line.upper().split()[1:] if center not in self.centers[power]]) + + # ------------------------------------ + # Home centers, overriding those from the power declaration line + # -- INHABITS center... + elif upword == 'INHABITS': + if not power: + self.error += [err.MAP_INHABITS_BEFORE_POWER % ' '.join(word)] + else: + reinit = power not in self.inhabits + if reinit: + self.inhabits.append(power) + self.add_homes(power, word[1:], reinit) + + # -- HOME(S) [center...] + elif upword in ('HOME', 'HOMES'): + if not power: + self.error += [err.MAP_HOME_BEFORE_POWER % (upword, ' '.join(word))] + else: + if power not in self.inhabits: + self.inhabits.append(power) + self.add_homes(power, word[1:], 1) + + # ------------------------------------ + # Clear known units for a power + # -- UNITS + elif upword == 'UNITS': + if power: + self.units[power] = [] + else: + self.error += [err.MAP_UNITS_BEFORE_POWER] + + # ------------------------------------ + # Unit Designation (A or F) + # -- unit location + elif upword in ('A', 'F'): + unit = ' '.join(word).upper() + if not power: + self.error += [err.MAP_UNIT_BEFORE_POWER] + elif len(word) == 2: + for units in self.units.values(): + for current_unit in units: + if current_unit[2:5] == unit[2:5]: + units.remove(current_unit) + self.units.setdefault(power, []).append(unit) + else: + self.error += [err.MAP_INVALID_UNIT % unit] + + # ------------------------------------ + # Dummies + # -- DUMMY [ALL] -or- + # -- DUMMY [ALL EXCEPT] powerName... -or- + # -- DUMMIES ALL -or- + # -- DUMMIES [ALL EXCEPT] powerName... + elif upword in ('DUMMY', 'DUMMIES'): + if len(word) > 1: + power = None + # DUMMY + if len(word) == 1: + if upword == 'DUMMIES': + self.error += [err.MAP_DUMMY_REQ_LIST_POWERS] + elif not power: + self.error += [err.MAP_DUMMY_BEFORE_POWER] + elif power not in self.dummies: + self.dummies += [power] + # DUMMY powerName powerName + elif word[1].upper() != 'ALL': + self.dummies.extend( + [dummy for dummy in [self.norm_power(p_name) for p_name in word[1:]] + if dummy not in self.dummies]) + # DUMMY ALL + elif len(word) == 2: + self.dummies = [power_name for power_name in self.homes if power_name != 'UNOWNED'] + # DUMMY ALL powerName + elif word[2].upper() != 'EXCEPT': + self.error += [err.MAP_NO_EXCEPT_AFTER_DUMMY_ALL % upword] + # DUMMY ALL EXCEPT + elif len(word) == 3: + self.error += [err.MAP_NO_POWER_AFTER_DUMMY_ALL_EXCEPT % upword] + # DUMMY ALL EXCEPT powerName powerName + else: + self.dummies = [power_name for power_name in self.homes if power_name not in + (['UNOWNED'] + [self.norm_power(except_pow) for except_pow in word[3:]])] + + # ------------------------------------ + # -- DROP abbreviation... + elif upword == 'DROP': + for place in [loc.upper() for loc in word[1:]]: + self.drop(place) + + # ------------------------------------ + # Terrain type and adjacencies (with special adjacency rules) + # -- COAST abbreviation [ABUTS [abut...]] -or- + # -- LAND abbreviation [ABUTS [abut...]] -or- + # -- WATER abbreviation [ABUTS [abut...]] -or- + # -- PORT abbreviation [ABUTS [abut...]] -or- + # -- SHUT abbreviation [ABUTS [abut...]] -or- + # -- AMEND abbreviation [ABUTS [abut...]] + # -- - removes an abut + elif len(word) > 1 and upword in ('AMEND', 'WATER', 'LAND', 'COAST', 'PORT', 'SHUT'): + place, other = word[1], word[1].swapcase() + + # Removing the place and all its coasts + if other in self.locs: + self.locs.remove(other) + if upword == 'AMEND': + self.loc_type[place] = self.loc_type[other] + self.loc_abut[place] = self.loc_abut[other] + del self.loc_type[other] + del self.loc_abut[other] + if place.isupper(): + for loc in self.locs: + if loc.startswith(place): + self.drop(loc) + if place in self.locs: + self.locs.remove(place) + + # Re-adding the place and its type + self.locs += [place] + if upword != 'AMEND': + self.loc_type[place] = word[0] + if len(word) > 2: + self.loc_abut[place] = [] + elif place not in self.loc_type: + self.error += [err.MAP_NO_DATA_TO_AMEND_FOR % place] + if len(word) > 2 and word[2].upper() != 'ABUTS': + self.error += [err.MAP_NO_ABUTS_FOR % place] + + # Processing ABUTS (adjacencies) + for dest in word[3:]: + + # Removing abuts if they start with - + if dest[0] == '-': + for site in self.loc_abut[place][:]: + if site.upper().startswith(dest[1:].upper()): + self.loc_abut[place].remove(site) + continue + + # Now add the adjacency + self.loc_abut[place] += [dest] + + # ------------------------------------ + # Removal of an existing power + # -- UNPLAYED [ALL] -or- + # -- UNPLAYED [ALL EXCEPT] powerName... + elif upword == 'UNPLAYED': + goners = [] + # UNPLAYED powerName + if len(word) == 1: + if not power: + self.error += [err.MAP_UNPLAYED_BEFORE_POWER] + else: + goners = [power] + # UNPLAYED powerName powerName + elif word[1].upper() != 'ALL': + goners = [self.norm_power(power_name) for power_name in word[1:]] + # UNPLAYED ALL + elif len(word) == 2: + goners = [power_name for power_name in self.homes if power_name != 'UNOWNED'] + # UNPLAYED ALL playerName + elif word[2].upper() != 'EXCEPT': + self.error += [err.MAP_NO_EXCEPT_AFTER_UNPLAYED_ALL] + # UNPLAYED ALL EXCEPT + elif len(word) == 3: + self.error += [err.MAP_NO_POWER_AFTER_UNPLAYED_ALL_EXCEPT] + # UNPLAYED ALL EXCEPT powerName + else: + goners = [power_name for power_name in self.homes if power_name not in + (['UNOWNED'] + [self.norm_power(pow_except) for pow_except in word[3:]])] + + # Removing each power + for goner in goners: + try: + del self.pow_name[goner] + del self.own_word[goner] + del self.homes[goner] + self.dummies = [x for x in self.dummies if x != goner] + self.inhabits = [x for x in self.inhabits if x != goner] + if goner in self.centers: + del self.centers[goner] + self.owns = [x for x in self.owns if x != goner] + if goner in self.abbrev: + del self.abbrev[goner] + if goner in self.units: + del self.units[goner] + self.powers = [x for x in self.powers if x != goner] + except KeyError: + self.error += [err.MAP_NO_SUCH_POWER_TO_REMOVE % goner] + power = None + + else: + # ------------------------------------ + # Power name, ownership word, and home centers + # -- [oldName ->] powerName [([ownWord][:[abbrev]])] [center...] + # -- UNOWNED [center...] -or- + # -- NEUTRAL [center...] -or- + # -- CENTERS [center...] + if upword in ('NEUTRAL', 'CENTERS'): + upword = 'UNOWNED' + power = self.norm_power(upword) if upword != 'UNOWNED' else 0 + + # Renaming power (Not Supported) + if len(word) > 2 and word[1] == '->': + old_power = power + word = word[2:] + upword = word[0].upper() + if upword in ('NEUTRAL', 'CENTERS'): + upword = 'UNOWNED' + power = self.norm_power(upword) if upword != 'UNOWNED' else 0 + if not old_power or not power: + self.error += [err.MAP_RENAMING_UNOWNED_DIR_NOT_ALLOWED] + elif not self.pow_name.get(old_power): + self.error += [err.MAP_RENAMING_UNDEF_POWER % old_power] + else: + self.error += [err.MAP_RENAMING_POWER_NOT_SUPPORTED] + + # Adding power + if power and not self.pow_name.get(power): + self.pow_name[power] = upword + normed = self.norm(power) + # Add power to aliases even if the normed form is identical. That way + # it becomes part of the vocabulary. + if not normed: + self.error += [err.MAP_POWER_NAME_EMPTY_KEYWORD % power] + normed = power + if normed not in self.aliases: + if len(normed.split('/')[0]) in (1, 3): + self.error += [err.MAP_POWER_NAME_CAN_BE_CONFUSED % normed] + self.aliases[normed] = power + elif self.aliases[normed] != power: + self.error += [err.MAP_DUP_LOC_OR_POWER % normed] + + # Processing own word and abbreviations + upword = power or upword + if power and len(word) > 1 and word[1][0] == '(': + self.own_word[upword] = word[1][1:-1] or power + normed = self.norm(self.own_word[upword]) + if normed == power: + pass + elif normed not in self.aliases: + self.aliases[normed] = power + elif self.aliases[normed] != power: + self.error += [err.MAP_DUP_LOC_OR_POWER % normed] + if ':' in word[1]: + owner, abbrev = self.own_word[upword].split(':') + self.own_word[upword] = owner or power + self.abbrev[upword] = abbrev[:1].upper() + if not abbrev or self.abbrev[upword] in 'M?': + self.error += [err.MAP_ILLEGAL_POWER_ABBREV] + del word[1] + else: + self.own_word.setdefault(upword, upword) + + # Adding homes + reinit = upword in self.inhabits + if reinit: + self.inhabits.remove(upword) + self.add_homes(upword, word[1:], reinit) + + def build_cache(self): + """ Builds a cache to speed up abuts and coasts lookup """ + # Adding all coasts to loc_coasts + for loc in self.locs: + self.loc_coasts[loc.upper()] = \ + [map_loc.upper() for map_loc in self.locs if loc.upper()[:3] == map_loc.upper()[:3]] + + # Building abuts cache + for unit_type in ['A', 'F']: + for unit_loc in self.locs: + for other_loc in self.locs: + for order_type in ['-', 'S', 'C']: + + # Calculating and setting in cache + unit_loc, other_loc = unit_loc.upper(), other_loc.upper() + query_tuple = (unit_type, unit_loc, order_type, other_loc) + self.abuts_cache[query_tuple] = self._abuts(*query_tuple) + + def add_homes(self, power, homes, reinit): + """ Add new homes (and deletes previous homes if reinit) + :param power: Name of power (e.g. ITALY) + :param homes: List of homes e.g. ['BUR', '-POR', '*ITA', ... ] + :param reinit: Indicates that we want to strip the list of homes before adding + :return: Nothing + """ + # Reset homes + if reinit: + self.homes[power] = [] + else: + self.homes.setdefault(power, []) + self.homes.setdefault('UNOWNED', []) + + # For each home: + # '-' indicates we want to remove home + for home in ' '.join(homes).upper().split(): + remove = 0 + while home: + if home[0] == '-': + remove = 1 + else: + break + home = home[1:] + if not home: + continue + + # Removing the home if already there + if home in self.homes[power]: + self.homes[power].remove(home) + if power != 'UNOWNED': + self.homes['UNOWNED'].append(home) + + # Re-adding it + if not remove: + self.homes[power].append(home) + + def drop(self, place): + """ Drop a place + :param place: Name of place to remove + :return: Nothing + """ + # Removing from locs + for loc in list(self.locs): + if loc.upper().startswith(place): + self.locs.remove(loc) + + # Loc_name + for full_name, loc in list(self.loc_name.items()): + if loc.startswith(place): + self.loc_name.pop(full_name) + + # Aliases + for alias, loc in list(self.aliases.items()): + if loc.startswith(place): + self.aliases.pop(alias) + + # Homes + for power_name, power_homes in list(self.homes.items()): + if place in power_homes: + self.homes[power_name].remove(place) + + # Units + for power_name, power_units in list(self.units.items()): + for unit in power_units: + if unit[2:5] == place[:3]: + self.units[power_name].remove(unit) + + # Supply Centers + for center in list(self.scs): + if center.upper().startswith(place): + self.scs.remove(center) + + # Centers ownerships + for power_name, power_centers in list(self.centers.items()): + for center in power_centers: + if center.startswith(place): + self.centers[power_name].remove(center) + + # Removing from adjacencies list + for site_name, site_abuts in list(self.loc_abut.items()): + for site in [loc for loc in site_abuts if loc.upper().startswith(place)]: + self.loc_abut[site_name].remove(site) + if site_name.startswith(place): + self.loc_abut.pop(site_name) + + # Removing loc_type + for loc in list(self.loc_type): + if loc.startswith(place): + self.loc_type.pop(loc) + + def norm_power(self, power): + """ Normalise the name of a power (removes spaces) + :param power: Name of power to normalise + :return: Normalised power name + """ + return self.norm(power).replace(' ', '') + + def norm(self, phrase): + """ Normalise a sentence (add spaces before /, replace -+, with ' ', remove .: + :param phrase: Phrase to normalise + :return: Normalised sentences + """ + phrase = phrase.upper().replace('/', ' /').replace(' / ', '') + for token in '.:': + phrase = phrase.replace(token, '') + for token in '-+,': + phrase = phrase.replace(token, ' ') + for token in '|*?!~()[]=_^': + phrase = phrase.replace(token, ' {} '.format(token)) + + # Replace keywords which, contrary to aliases, all consist of a single word + return ' '.join([self.keywords.get(keyword, keyword) for keyword in phrase.strip().split()]) + + def compact(self, phrase): + """ Compacts a full sentence into a list of short words + :param phrase: The full sentence to compact (e.g. 'England: Fleet Western Mediterranean -> Tyrrhenian + Sea. (*bounce*)') + :return: The compacted phrase in an array (e.g. ['ENGLAND', 'F', 'WES', 'TYS', '|']) + """ + word, result = self.norm(phrase).split(), [] + while word: + alias, i = self.alias(word) + if alias: + result += alias.split() + word = word[i:] + return result + + def alias(self, word): + """ This function is used to replace multi-words with their acronyms + :param word: The current list of words to try to shorten + :return: alias, ix - alias is the shorten list of word, ix is the ix of the next non-processed word + """ + # pylint: disable=too-many-return-statements + # Assume that word already was subject to norm() + # Process with content inside square or round brackets + j = -1 + alias = word[0] + if alias in '([': + for j in range(1, len(word)): + if word[j] == '])'[alias == '(']: + break + else: + return alias, 1 + if j == 1: + return '', 2 + if word[1] + word[j - 1] == '**': + word2 = word[2:j - 1] + else: + word2 = word[1:j] + alias2 = self.aliases.get(' '.join(word2) + ' \\', '') + if alias2[-2:] == ' \\': + return alias2[:-2], j + 1 + result = [] + while word2: + alias2, i = self.alias(word2) + if alias2: + result += [alias2] + word2 = word2[i:] + return ' '.join(result), j + 1 + for i in range(len(word), 0, -1): + key = ' '.join(word[:i]) + if key in self.aliases: + alias = self.aliases[key] + break + else: + i = 1 + + # Concatenate coasts + if i == len(word): + return alias, i + if alias[:1] != '/' and ' ' not in alias: + alias2, j = self.alias(word[i:]) + if alias2[:1] != '/' or ' ' in alias2: + return alias, i + elif alias[-2:] == ' \\': + alias2, j = self.alias(word[i:]) + if alias2[:1] == '/' or ' ' in alias2: + return alias, i + alias, alias2 = alias2, alias[:-2] + else: + return alias, i + + # Check if the location is also an ambiguous power name + # and replace with its other name if that's the case + if alias in self.powers and alias in self.unclear: + alias = self.unclear[alias] + + # Check if the coast is mapped to another coast + if alias + ' ' + alias2 in self.aliases: + return self.aliases[alias + ' ' + alias2], i + j + return alias + alias2, i + j + + def vet(self, word, strict=0): + """ Determines the type of every word in a compacted order phrase + 0 - Undetermined, 1 - Power, 2 - Unit, 3 - Location, 4 - Coastal location + 5 - Order, 6 - Move Operator (-=_^), 7 - Non-move separator (|?~) or result (*!?~+) + :param word: The list of words to vet (e.g. ['A', 'POR', 'S', 'SPA/NC']) + :param strict: Boolean to indicate that we want to verify that the words actually exist. + Numbers become negative if they don't exist + :return: A list of tuple (e.g. [('A', 2), ('POR', 3), ('S', 5), ('SPA/NC', 4)]) + """ + result = [] + for thing in word: + if ' ' in thing: + data_type = UNDETERMINED + elif len(thing) == 1: + if thing in self.unit_names: + data_type = UNIT + elif thing.isalnum(): + data_type = ORDER + elif thing in '-=_': + data_type = MOVE_SEP + else: + data_type = OTHER + elif '/' in thing: + if thing.find('/') == 3: + data_type = COAST + else: + data_type = POWER + elif thing == 'VIA': + data_type = ORDER + elif len(thing) == 3: + data_type = LOCATION + else: + data_type = POWER + if strict and thing not in list(self.aliases.values()) + list(self.keywords.values()): + data_type = -data_type + result += [(thing, data_type)] + return result + + def rearrange(self, word): + """ This function is used to parse commands + :param word: The list of words to vet (e.g. ['ENGLAND', 'F', 'WES', 'TYS', '|']) + :return: The list of words in the correct order to be processed (e.g. ['ENGLAND', 'F', 'WES', '-', 'TYS']) + """ + # pylint: disable=too-many-branches + # Add | to start and end of list (to simplify edge cases) (they will be returned as ('|', 7)) + # e.g. [('|', 7), ('A', 2), ('POR', 3), ('S', 5), ('SPA/NC', 4), ('|', 7)] + result = self.vet(['|'] + word + ['|']) + + # Remove result tokens (7) at start and end of string (but keep |) + result[0] = ('|', UNDETERMINED) + while result[-2][1] == OTHER: + del result[-2] + if len(result) == 2: + return [] + result[0] = ('|', OTHER) + while result[1][1] == OTHER: + del result[1] + + # Move "with" unit and location to the start. There should be only one + # Ignore the rest + found = 0 + while ('?', OTHER) in result: + i = result.index(('?', OTHER)) + del result[i] + if found: + continue + j = -1 + for j in range(i, len(result)): + if result[j][1] in (POWER, UNIT): + continue + if result[j][1] in (LOCATION, COAST): + j += 1 + break + if j != i: + found = 1 + k = 0 + for k in range(1, i): + if result[k][1] not in (POWER, UNIT): + break + if k < i: + result[k:k] = result[i:j] + result[j:2 * j - i] = [] + + # Move "from" location before any preceding locations + while ('\\', OTHER) in result: + i = result.index(('\\', OTHER)) + del result[i] + if result[i][1] not in (LOCATION, COAST): + continue + for j in range(i - 1, -1, -1): + if result[j][1] not in (LOCATION, COAST) and result[j][0] != '~': + break + if j + 1 != i: + result[j + 1:j + 1] = result[i:i + 1] + del result[i + 1] + + # Move "via" locations between the two preceding locations. + while ('~', OTHER) in result: + i = result.index(('~', OTHER)) + del result[i] + if (result[i][1] not in (LOCATION, COAST) + or result[i - 1][1] not in (LOCATION, COAST) + or result[i - 2][1] not in (LOCATION, COAST)): + continue + for j in range(i + 1, len(result)): + if result[j][1] not in (LOCATION, COAST): + break + result[j:j] = result[i - 1:i] + del result[i - 1] + + # Move order beyond first location + i = 0 + for j in range(1, len(result)): + if result[j][1] in (LOCATION, COAST): + if i: + result[j + 1:j + 1] = result[i:i + 1] + del result[i] + break + elif result[j][1] == ORDER: + i = j + elif result[j][0] == '|': + break + + # Put the power before the unit, or replace it with a location if there's ambiguity + vet = 0 + for i, result_i in enumerate(result): + if result_i[1] == POWER: + if vet > 0 and result_i[0] in self.unclear: + result[i] = (self.unclear[result_i[0]], LOCATION) + elif vet == 1: + result[i + 1:i + 1] = result[i - 1:i] + del result[i - 1] + vet = 2 + elif not vet and result_i[1] == UNIT: + vet = 1 + elif result_i[1] == ORDER: + vet = 0 + else: + vet = 2 + + # Insert hyphens between subsequent locations + for i in range(len(result) - 1, 1, -1): + if result[i][1] in (LOCATION, COAST) and result[i - 1][1] in (LOCATION, COAST): + result[i:i] = [('-', MOVE_SEP)] + + # Remove vertical bars at start and end + return [x for x, y in result[1:-1]] + + def area_type(self, loc): + """ Returns 'WATER', 'COAST', 'PORT', 'LAND', 'SHUT' + :param loc: The name of the location to query + :return: Type of the location ('WATER', 'COAST', 'PORT', 'LAND', 'SHUT') + """ + return self.loc_type.get(loc.upper()) or self.loc_type.get(loc.lower()) + + def default_coast(self, word): + """ Returns the coast for a fleet move order that can only be to a single coast + (e.g. F GRE-BUL returns F GRE-BUL/SC) + :param word: A list of tokens (e.g. ['F', 'GRE', '-', 'BUL']) + :return: The updated list of tokens (e.g. ['F', 'GRE', '-', 'BUL/SC']) + """ + if len(word) == 4 and word[0] == 'F' and word[2] == '-' and '/' not in word[3]: + unit_loc, new_loc, single_coast = word[1], word[3], None + for place in self.abut_list(unit_loc): + up_place = place.upper() + if new_loc == up_place: # Target location found with no coast, original query is correct + return word + if new_loc == up_place[:3]: + if single_coast: # Target location has multiple coasts, unable to decide + return word + single_coast = up_place # Found a potential candidate, storing it + word[3] = single_coast or new_loc # Only one candidate found, modifying the order to include it + return word + + def find_coasts(self, loc): + """ Finds all coasts for a given location + :param loc: The name of a location (e.g. 'BUL') + :return: Returns the list of all coasts, including the location (e.g. ['BUL', 'BUL/EC', 'BUL/SC'] + """ + return self.loc_coasts.get(loc.upper(), []) + + def abuts(self, unit_type, unit_loc, order_type, other_loc): + """ Determines if a order for unit_type from unit_loc to other_loc is adjacent + Note: This method uses the precomputed cache + + :param unit_type: The type of unit ('A' or 'F') + :param unit_loc: The location of the unit ('BUR', 'BUL/EC') + :param order_type: The type of order ('S' for Support, 'C' for Convoy', '-' for move) + :param other_loc: The location of the other unit + :return: 1 if the locations are adjacent for the move, 0 otherwise + """ + if unit_type == '?': + return (self.abuts_cache.get(('A', unit_loc.upper(), order_type, other_loc.upper()), 0) or + self.abuts_cache.get(('F', unit_loc.upper(), order_type, other_loc.upper()), 0)) + + query_tuple = (unit_type, unit_loc.upper(), order_type, other_loc.upper()) + return self.abuts_cache.get(query_tuple, 0) + + def _abuts(self, unit_type, unit_loc, order_type, other_loc): + """ Determines if a order for unit_type from unit_loc to other_loc is adjacent + Note: This method is used to generate the abuts_cache + + :param unit_type: The type of unit ('A' or 'F') + :param unit_loc: The location of the unit ('BUR', 'BUL/EC') + :param order_type: The type of order ('S' for Support, 'C' for Convoy', '-' for move) + :param other_loc: The location of the other unit + :return: 1 if the locations are adjacent for the move, 0 otherwise + """ + # pylint: disable=too-many-return-statements + unit_loc, other_loc = unit_loc.upper(), other_loc.upper() + + # Removing coasts for support + # Otherwise, if army, not adjacent since army can't move, hold, or convoy on coasts + if '/' in other_loc: + if order_type == 'S': + other_loc = other_loc[:3] + elif unit_type == 'A': + return 0 + + # Looking for adjacency between unit_loc and other_loc + # If the break line is not executed, not adjacency were found + place = '' + for place in self.abut_list(unit_loc): + up_place = place.upper() + up_loc = up_place[:3] + if other_loc in (up_place, up_loc): + break + else: + return 0 + + # If the target location is impassible, returning 0 + other_loc_type = self.area_type(other_loc) + if other_loc_type == 'SHUT': + return 0 + + # If the unit type is unknown, then assume the adjacency is okay + if unit_type == '?': + return 1 + + # Fleets cannot affect LAND and fleets are not adjacent to any location listed in lowercase + # (except when offering support into such an area, as in F BOT S A MOS-STP), or listed in + # the adjacency list in lower-case (F VEN-TUS) + + # Fleet should be supporting a adjacent 'COAST', 'WATER' or 'PORT', with a name starting with a capital letter + if unit_type == 'F': + if (other_loc_type == 'LAND' + or place[0] != up_loc[0] + or order_type != 'S' + and other_loc not in self.loc_type): + return 0 + + # Armies cannot move to water (unless this is a convoy). Note that the caller + # is responsible for determining if a fleet exists at the adjacent spot to convoy + # the army. Also, armies can't move to spaces listed in Mixed case. + elif order_type != 'C' and (other_loc_type == 'WATER' or place == place.title()): + return 0 + + # It's adjacent. + return 1 + + def is_valid_unit(self, unit, no_coast_ok=0, shut_ok=0): + """ Determines if a unit and location combination is valid (e.g. 'A BUR') is valid + :param unit: The name of the unit with its location (e.g. F SPA/SC) + :param no_coast_ok: Indicates if a coastal location with no coast (e.g. SPA vs SPA/SC) is acceptable + :param shut_ok: Indicates if a impassable country (e.g. Switzerland) is OK + :return: A boolean to indicate if the unit/location combination is valid + """ + unit_type, loc = unit.upper().split() + area_type = self.area_type(loc) + if area_type == 'SHUT': + return 1 if shut_ok else 0 + if unit_type == '?': + return 1 if area_type is not None else 0 + # Army can be anywhere, except in 'WATER' + if unit_type == 'A': + return '/' not in loc and area_type in ('LAND', 'COAST', 'PORT') + # Fleet must be in WATER, COAST, or PORT + # Coastal locations are stored in CAPS with coasts and non-caps with non-coasts + # e.g. SPA/NC, SPA/SC, spa + return (unit_type == 'F' + and area_type in ('WATER', 'COAST', 'PORT') + and (no_coast_ok or loc.lower() not in self.loc_abut)) + + def abut_list(self, site, incl_no_coast=False): + """ Returns the adjacency list for the site + :param site: The province we want the adjacency list for + :param incl_no_coast: Boolean flag that indicates to also include province without coast if it has coasts + e.g. will return ['BUL/SC', 'BUL/EC']Â if False, and ['bul', 'BUL/SC', 'BUL/EC'] if True + :return: A list of adjacent provinces + + Note: abuts are returned in mixed cases (lowercase for A only, First capital letter for F only) + """ + if site in self.loc_abut: + abut_list = self.loc_abut.get(site, []) + else: + abut_list = self.loc_abut.get(site.lower(), []) + if incl_no_coast: + abut_list = abut_list[:] + for loc in list(abut_list): + if '/' in loc and loc[:3] not in abut_list: + abut_list += [loc[:3]] + return abut_list + + def find_next_phase(self, phase, phase_type=None, skip=0): + """ Returns the long name of the phase coming immediately after the phase + :param phase: The long name of the current phase (e.g. SPRING 1905 RETREATS) + :param phase_type: The type of phase we are looking for (e.g. 'M' for Movement, 'R' for Retreats, + 'A' for Adjust.) + :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after) + :return: The long name of the next phase (e.g. FALL 1905 MOVEMENT) + """ + # If len < 3, Phase is FORMING or COMPLETED, unable to find previous phase + now = phase.split() + if len(now) < 3: + return phase + + # Parsing year and season index + year = int(now[1]) + season_ix = (self.seq.index('%s %s' % (now[0], now[2])) + 1) % len(self.seq) + seq_len = len(self.seq) + + # Parsing the sequence of seasons + while seq_len: + seq_len -= 1 + new = self.seq[season_ix].split() + + # Looking for IFYEARDIV DIV or IFYEARDIV DIV=MOD + if new[0] == 'IFYEARDIV': + if '=' in new[1]: + div, mod = map(int, new[1].split('=')) + else: + div, mod = int(new[1]), 0 + if year % div != mod: + season_ix = -1 + + # NEWYEAR [X] indicates to increase years by [X] (or 1 by default) + elif new[0] == 'NEWYEAR': + year += len(new) == 1 or int(new[1]) + + # Found phase + elif phase_type in (None, new[1][0]): + if skip == 0: + return '%s %s %s' % (new[0], year, new[1]) + skip -= 1 + seq_len = len(self.seq) + season_ix += 1 + season_ix %= len(self.seq) + + # Could not find next phase + return '' + + def find_previous_phase(self, phase, phase_type=None, skip=0): + """ Returns the long name of the phase coming immediately prior the phase + :param phase: The long name of the current phase (e.g. SPRING 1905 RETREATS) + :param phase_type: The type of phase we are looking for (e.g. 'M' for Movement, 'R' for Retreats, + 'A' for Adjust.) + :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after) + :return: The long name of the previous phase (e.g. SPRING 1905 MOVEMENT) + """ + # If len < 3, Phase is FORMING or COMPLETED, unable to find previous phase + now = phase.split() + if len(now) < 3: + return phase + + # Parsing year and season index + year = int(now[1]) + season_ix = self.seq.index('%s %s' % (now[0], now[2])) + seq_len = len(self.seq) + + # Parsing the sequence of seasons + while seq_len: + seq_len -= 1 + season_ix -= 1 + + # No more seasons in seq + if season_ix == -1: + for new in [x.split() for x in self.seq]: + # Looking for IFYEARDIV DIV or IFYEARDIV DIV=MOD + if new[0] == 'IFYEARDIV': + if '=' in new[1]: + div, mod = map(int, new[1].split('=')) + else: + div, mod = int(new[1]), 0 + if year % div != mod: + break + season_ix += 1 + + # Parsing next seq + new = self.seq[season_ix].split() + if new[0] == 'IFYEARDIV': + pass + + # NEWYEAR [X] indicates to increase years by [X] (or 1 by default) + elif new[0] == 'NEWYEAR': + year -= len(new) == 1 or int(new[1]) + + # Found phase + elif phase_type in (None, new[1][0]): + if skip == 0: + return '%s %s %s' % (new[0], year, new[1]) + skip -= 1 + seq_len = len(self.seq) + + # Could not find prev phase + return '' + + def compare_phases(self, phase1, phase2): + """ Compare 2 phases (Strings) and return 1, -1, or 0 to indicate which phase is larger + :param phase1: The first phase (e.g. S1901M, FORMING, COMPLETED) + :param phase2: The second phase (e.g. S1901M, FORMING, COMPLETED) + :return: 1 if phase1 > phase2, -1 if phase2 > phase1 otherwise 0 if they are equal + """ + # If the phase ends with '?', we assume it's the last phase type of that season + # e.g. S1901? -> S1901R W1901? -> W1901A + if phase1[-1] == '?': + phase1 = phase1[:-1] + [season.split()[1][0] for season in self.seq if season[0] == phase1[0]][-1] + if phase2[-1] == '?': + phase2 = phase2[:-1] + [season.split()[1][0] for season in self.seq if season[0] == phase2[0]][-1] + + # Converting S1901M (abbrv) to long phase (SPRING 1901 MOVEMENT) + if len(phase1.split()) == 1: + phase1 = self.phase_long(phase1, phase1.upper()) + if len(phase2.split()) == 1: + phase2 = self.phase_long(phase2, phase2.upper()) + if phase1 == phase2: + return 0 + now1, now2 = phase1.split(), phase2.split() + + # One of the phase is either FORMING, or COMPLETED + # Syntax is (bool1 and int1 or bool2 and int2) will return int1 if bool1, else int2 if bool2 + # 1 = FORMING, 2 = Normal Phase, 3 = COMPLETED, 0 = UNKNOWN + if len(now1) < 3 or len(now2) < 3: + order1 = (len(now1) > 2 and 2 or phase1 == 'FORMING' and 1 or phase1 == 'COMPLETED' and 3 or 0) + order2 = (len(now2) > 2 and 2 or phase2 == 'FORMING' and 1 or phase2 == 'COMPLETED' and 3 or 0) + return order1 > order2 and 1 or order1 < order2 and -1 or 0 + + # Comparing years + year1, year2 = int(now1[1]), int(now2[1]) + if year1 != year2: + return (year1 > year2 and 1 or -1) * (self.flow_sign or 1) + + # Comparing seasons + # Returning the inverse if NEW_YEAR is between the 2 seasons + season_ix1 = self.seq.index('%s %s' % (now1[0], now1[2])) + season_ix2 = self.seq.index('%s %s' % (now2[0], now2[2])) + if season_ix1 > season_ix2: + return -1 if 'NEWYEAR' in [x.split()[0] for x in self.seq[(season_ix2) + (1):season_ix1]] else 1 + if season_ix1 < season_ix2: + return 1 if 'NEWYEAR' in [x.split()[0] for x in self.seq[(season_ix1) + (1):season_ix2]] else -1 + return 0 + + @staticmethod + def phase_abbr(phase, default='?????'): + """ Constructs a 5 character representation (S1901M) from a phase (SPRING 1901 MOVEMENT) + :param phase: The full phase (e.g. SPRING 1901 MOVEMENT) + :param default: The default value to return in case conversion fails + :return: A 5 character representation of the phase + """ + if phase in ('FORMING', 'COMPLETED'): + return phase + parts = tuple(phase.split()) + return ('%.1s%04d%.1s' % (parts[0], int(parts[1]), parts[2])).upper() if len(parts) == 3 else default + + def phase_long(self, phase_abbr, default='?????'): + """ Constructs a full sentence of a phase from a 5 character abbreviation + :param phase_abbr: 5 character abbrev. (e.g. S1901M) + :param default: The default value to return in case conversion fails + :return: A full phase description (e.g. SPRING 1901 MOVEMENT) + """ + try: + year = int(phase_abbr[1:-1]) + for season in self.seq: + parts = season.split() + if parts[0] not in ('NEWYEAR', 'IFYEARDIV') \ + and parts[0][0].upper() == phase_abbr[0].upper() \ + and parts[1][0].upper() == phase_abbr[-1].upper(): + return '{} {} {}'.format(parts[0], year, parts[1]).upper() + except ValueError: + pass + return default + +# Loading at the bottom, to avoid load recursion +from diplomacy.utils.convoy_paths import add_to_cache, get_convoy_paths_cache # pylint: disable=wrong-import-position +CONVOYS_PATH_CACHE = get_convoy_paths_cache() diff --git a/diplomacy/engine/message.py b/diplomacy/engine/message.py new file mode 100644 index 0000000..2d6d644 --- /dev/null +++ b/diplomacy/engine/message.py @@ -0,0 +1,115 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +""" Game message. Represent a message exchanged inside a game. + + Possible messages exchanges: + - power 1 -> power 2 + - power -> all game + - system -> power + - system -> all game + - system -> observers + - system -> omniscient observers + + Sender `system` is identified with constant SYSTEM defined below. + Recipients `all game`, `observers` and `omniscient observers` are identified respectively with constants + GLOBAL, OBSERVER and OMNISCIENT defined below. + + Consider using Game methods to generate appropriate messages instead of this class directly: + - Game.new_power_message() to send a message from a power to another. + - Game.new_global_message() to send a message from a power to all game. + - ServerGame.new_system_message() to send a server system message. + Use constant names defined below to specify recipient for system message when it's not a power name + (GLOBAL, OBSERVER or OMNISCIENT). +""" +from diplomacy.utils import parsing, strings +from diplomacy.utils.jsonable import Jsonable + +SYSTEM = 'SYSTEM' # sender +GLOBAL = 'GLOBAL' # recipient (all powers) +OBSERVER = 'OBSERVER' # recipient (all observer tokens) +OMNISCIENT = 'OMNISCIENT' # recipient (all omniscient tokens) + +class Message(Jsonable): + """ GameMessage class. Properties: + - sender: message sender name: either SYSTEM or a power name. + - recipient: message recipient name: either GLOBAL, OBSERVER, OMNISCIENT or a power name. + - time_sent: message timestamp in microseconds. + - phase: short name of game phase when message is sent. + - message: message body. + + Note about timestamp management: + We assume a message has an unique timestamp inside one game. To respect this rule, the server is the only one + responsible for generating message timestamps. This allow to generate timestamp or only 1 same machine (server) + instead of managing timestamps from many user machines, to prevent timestamp inconsistency when messages + are stored on server. Therefore, message timestamp is the time when server stores the message, not the time + when message was sent by any client. + """ + __slots__ = ['sender', 'recipient', 'time_sent', 'phase', 'message'] + model = { + strings.SENDER: str, # either SYSTEM or a power name. + strings.RECIPIENT: str, # either GLOBAL, OBSERVER, OMNISCIENT or a power name. + strings.TIME_SENT: parsing.OptionalValueType(int), # given by server. + strings.PHASE: str, # phase short name. + strings.MESSAGE: str, + } + + def __init__(self, **kwargs): + self.sender = None # type: str + self.recipient = None # type: str + self.time_sent = None # type: int + self.phase = None # type: str + self.message = None # type: str + super(Message, self).__init__(**kwargs) + + def __str__(self): + return '[%d/%s/%s->%s](%s)' % ( + self.time_sent, self.phase, self.sender, self.recipient, self.message) + + def __hash__(self): + return hash(self.time_sent) + + def __eq__(self, other): + assert isinstance(other, Message) + return self.time_sent == other.time_sent + + def __ne__(self, other): + assert isinstance(other, Message) + return self.time_sent != other.time_sent + + def __lt__(self, other): + assert isinstance(other, Message) + return self.time_sent < other.time_sent + + def __gt__(self, other): + assert isinstance(other, Message) + return self.time_sent > other.time_sent + + def __le__(self, other): + assert isinstance(other, Message) + return self.time_sent <= other.time_sent + + def __ge__(self, other): + assert isinstance(other, Message) + return self.time_sent >= other.time_sent + + def is_global(self): + """ Return True if this message is global. """ + return self.recipient == GLOBAL + + def for_observer(self): + """ Return True if this message is sent to observers. """ + return self.recipient == OBSERVER diff --git a/diplomacy/engine/power.py b/diplomacy/engine/power.py new file mode 100644 index 0000000..570359b --- /dev/null +++ b/diplomacy/engine/power.py @@ -0,0 +1,392 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +""" Power + - Contains the power object representing a power in the game +""" +from copy import deepcopy +from diplomacy.utils import parsing, strings +from diplomacy.utils.exceptions import DiplomacyException +from diplomacy.utils.jsonable import Jsonable +from diplomacy.utils.sorted_dict import SortedDict +from diplomacy.utils import common, constants +from diplomacy.utils.constants import OrderSettings + +class Power(Jsonable): + """ Power Class + + Properties: + - abbrev - Contains the abbrev of the power (usually the first letter of the power name) (e.g. 'F' for FRANCE) + - adjust - List of pending adjustment orders + (e.g. ['A PAR B', 'A PAR R MAR', 'A MAR D', 'WAIVE']) + - centers - Contains the list of supply centers currently controlled by the power ['MOS', 'SEV', 'STP', 'WAR'] + - civil_disorder - Boolean flag to indicate that the power has been put in CIVIL_DISORDER (e.g. True or False) + - controller - Sorted dictionary mapping timestamp to controller (either dummy or a user ID) who takes + control of power at this timestamp. + - game - Contains a reference to the game object + - goner - Boolean to indicate that this power doesn't control any SCs any more (e.g. True or False) + - homes - Contains a list of homes supply centers (where you can build) + e.g. ['PAR', 'MAR', ... ] or None if empty + - influence - Contains a list of locations influenced by this power + Note: To influence a location, the power must have visited it last. + e.g ['PAR', 'MAR', ... ] + - name - Contains the name of the power + - orders - Contains a dictionary of units and their orders. + For NO_CHECK games, unit is 'ORDER 1', 'ORDER 2', ... + - e.g. {'A PAR': '- MAR' } or {'ORDER 1': 'A PAR - MAR', 'ORDER 2': '...', ... } + - Can also be {'REORDER 1': 'A PAR - MAR', 'INVALID 1': 'A PAR - MAR', ... } after validation + - retreats - Contains the list of units that need to retreat with their possible retreat locations + (e.g. {'A PAR': ['MAR', 'BER']}) + - role - Power type (observer, omniscient, player or server power). + Either the power name (for a player power) or a value in diplomacy.utils.strings.ALL_ROLE_TYPES + - tokens - Only for server power: set of tokens of current power controlled (if not None). + - units - Contains the list of units (e.g. ['A PAR', 'A MAR', ...] + - vote - Only for omniscient, player and server power: power vote ('yes', 'no' or 'neutral'). + """ + __slots__ = ['game', 'name', 'abbrev', 'adjust', 'centers', 'units', 'influence', 'homes', + 'retreats', 'goner', 'civil_disorder', 'orders', 'role', 'controller', 'vote', + 'order_is_set', 'wait', 'tokens'] + model = { + strings.ABBREV: parsing.OptionalValueType(str), + strings.ADJUST: parsing.DefaultValueType(parsing.SequenceType(str), []), + strings.CENTERS: parsing.DefaultValueType(parsing.SequenceType(str), []), + strings.CIVIL_DISORDER: parsing.DefaultValueType(int, 0), + strings.CONTROLLER: parsing.DefaultValueType(parsing.DictType(int, str, SortedDict.builder(int, str)), {}), + strings.HOMES: parsing.OptionalValueType(parsing.SequenceType(str)), + strings.INFLUENCE: parsing.DefaultValueType(parsing.SequenceType(str), []), + strings.NAME: parsing.PrimitiveType(str), + strings.ORDER_IS_SET: parsing.DefaultValueType(OrderSettings.ALL_SETTINGS, OrderSettings.ORDER_NOT_SET), + strings.ORDERS: parsing.DefaultValueType(parsing.DictType(str, str), {}), + strings.RETREATS: parsing.DefaultValueType(parsing.DictType(str, parsing.SequenceType(str)), {}), + strings.ROLE: parsing.DefaultValueType(str, strings.SERVER_TYPE), + strings.TOKENS: parsing.DefaultValueType(parsing.SequenceType(str, set), ()), + strings.UNITS: parsing.DefaultValueType(parsing.SequenceType(str), []), + strings.VOTE: parsing.DefaultValueType(parsing.EnumerationType(strings.ALL_VOTE_DECISIONS), strings.NEUTRAL), + strings.WAIT: parsing.DefaultValueType(bool, True), + } + + def __init__(self, game=None, name=None, **kwargs): + """ Constructor """ + self.game = game + self.abbrev = None + self.adjust, self.centers, self.units, self.influence = [], [], [], [] + self.homes = None + self.retreats = {} + self.goner = self.civil_disorder = 0 + self.orders = {} + + self.name = '' + self.role = '' + self.controller = SortedDict(int, str) + self.vote = '' + self.order_is_set = 0 + self.wait = False + self.tokens = set() + super(Power, self).__init__(name=name, **kwargs) + assert self.role in strings.ALL_ROLE_TYPES or self.role == self.name + if not self.controller: + self.controller.put(common.timestamp_microseconds(), strings.DUMMY) + + def __str__(self): + """ Returns a representation of the power instance """ + show_cd = self.civil_disorder + show_inhabits = self.homes is not None + show_owns = self.centers + show_retreats = len(self.retreats) > 0 + + text = '' + text += '\n%s (%s)' % (self.name, self.role) + text += '\nPLAYER %s' % self.controller.last_value() + text += '\nCD' if show_cd else '' + text += '\nINHABITS %s' % ' '.join(self.homes) if show_inhabits else '' + text += '\nOWNS %s' % ' '.join(self.centers) if show_owns else '' + if show_retreats: + text += '\n'.join([''] + [' '.join([unit, '-->'] + places) for unit, places in self.retreats.items()]) + text = '\n'.join([text] + self.units + self.adjust) + + # Orders - RIO is for REORDER, INVALID, ORDER (in NO_CHECK games) + text_order = '\nORDERS\n' + for unit, order in self.orders.items(): + if unit[0] not in 'RIO': + text_order += '%s ' % unit + text_order += order + '\n' + + text += text_order if self.orders else '' + return text + + def __deepcopy__(self, memo): + """ Fast deep copy implementation + - (Not setting the game object) + """ + cls = self.__class__ + result = cls.__new__(cls) + + # Deep copying + for key in self.__slots__: + if key not in ['game']: + setattr(result, key, deepcopy(getattr(self, key))) + + # Game + setattr(result, 'game', None) + return result + + def reinit(self, include_flags=6): + """ Performs a reinitialization of some of the parameters + :param include_flags: Bit mask to indicate which params to reset + (bit 1 = orders, 2 = persistent, 4 = transient) + :return: None + """ + reinit_persistent = include_flags & 2 + reinit_transient = include_flags & 4 + reinit_orders = include_flags & 1 + + # Initialize the persistent parameters + if reinit_persistent: + self.abbrev = None + + # Initialize the transient parameters + if reinit_transient: + for home in self.homes: + self.game.update_hash(self.name, loc=home, is_home=True) + for center in self.centers: + self.game.update_hash(self.name, loc=center, is_center=True) + for unit in self.units: + self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:]) + for dis_unit in self.retreats: + self.game.update_hash(self.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True) + self.homes = None + self.centers, self.units, self.influence = [], [], [] + self.retreats = {} + + # Initialize the order-related parameters + if reinit_orders: + self.civil_disorder = 0 + self.adjust = [] + self.orders = {} + if self.is_eliminated(): + self.order_is_set = OrderSettings.ORDER_SET_EMPTY + self.wait = False + else: + self.order_is_set = OrderSettings.ORDER_NOT_SET + self.wait = True if self.is_dummy() else (not self.game.real_time) + self.goner = 0 + + @staticmethod + def compare(power_1, power_2): + """ Comparator object - Compares two Power objects + :param power_1: The first Power object to compare + :param power_2: The second Power object to compare + :return: 1 if self is greater, -1 if other is greater, 0 if they are equal + """ + cmp = lambda power_1, power_2: ((power_1 > power_2) - (power_1 < power_2)) + xstr = lambda string: string or '' # To avoid comparing with None + cmp_type = cmp(xstr(power_1.role), xstr(power_2.role)) + cmp_name = cmp(xstr(power_1.name), xstr(power_2.name)) + return cmp_type or cmp_name + + def initialize(self, game): + """ Initializes a game and resets home, centers and units + :param game: The game to use for initialization + :type game: diplomacy.Game + """ + + # Not initializing observers and monitors + assert self.is_server_power() + + self.game = game + self.order_is_set = OrderSettings.ORDER_NOT_SET + self.wait = True if self.is_dummy() else (not self.game.real_time) + + # Get power abbreviation. + self.abbrev = self.game.map.abbrev.get(self.name, self.name[0]) + + # Resets homes + if self.homes is None: + self.homes = [] + for home in game.map.homes.get(self.name, []): + self.game.update_hash(self.name, loc=home, is_home=True) + self.homes.append(home) + + # Resets the centers and units + if not self.centers: + for center in game.map.centers.get(self.name, []): + game.update_hash(self.name, loc=center, is_center=True) + self.centers.append(center) + if not self.units: + for unit in game.map.units.get(self.name, []): + game.update_hash(self.name, unit_type=unit[0], loc=unit[2:]) + self.units.append(unit) + self.influence.append(unit[2:5]) + + def merge(self, other_power): + """ Transfer all units, centers, and homes of the other_power to this power + :param other_power: The other power (will be empty after the merge) + """ + # Regular units + for unit in list(other_power.units): + self.units.append(unit) + other_power.units.remove(unit) + self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:]) + self.game.update_hash(other_power.name, unit_type=unit[0], loc=unit[2:]) + + # Dislodged units + for unit in list(other_power.retreats.keys()): + self.retreats[unit] = other_power.retreats[unit] + del other_power.retreats[unit] + self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True) + self.game.update_hash(other_power.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True) + + # Influence + for loc in list(other_power.influence): + self.influence.append(loc) + other_power.influence.remove(loc) + + # Supply centers + for center in list(other_power.centers): + self.centers.append(center) + other_power.centers.remove(center) + self.game.update_hash(self.name, loc=center, is_center=True) + self.game.update_hash(other_power.name, loc=center, is_center=True) + + # Homes + for home in list(other_power.homes): + self.homes.append(home) + other_power.homes.remove(home) + self.game.update_hash(self.name, loc=home, is_home=True) + self.game.update_hash(other_power.name, loc=home, is_home=True) + + # Clearing state cache + self.game.clear_cache() + + def clear_units(self): + """ Removes all units from the map """ + for unit in self.units: + self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:]) + self.units = [] + self.influence = [] + self.game.clear_cache() + + def clear_centers(self): + """ Removes ownership of all supply centers """ + for center in self.centers: + self.game.update_hash(self.name, loc=center, is_center=True) + self.centers = [] + self.game.clear_cache() + + def is_dummy(self): + """ Indicates if the power is a dummy + :return: Boolean flag to indicate if the power is a dummy + """ + return self.controller.last_value() == strings.DUMMY + + def is_eliminated(self): + """ Returns a flag to show if player is eliminated + :return: If the current power is eliminated + """ + # Not eliminated if has units left + if self.units or self.centers or self.retreats: + return False + return True + + def clear_orders(self): + """ Clears the power's orders """ + self.reinit(include_flags=1) + + def moves_submitted(self): + """ Returns a boolean to indicate if moves has been submitted + :return: 1 if not in Movement phase, or orders submitted, or no more units lefts + """ + if self.game.phase_type != 'M': + return 1 + return self.orders or not self.units + + # ============================================================== + # Application/network methods (mainly used for connected games). + # ============================================================== + + def is_observer_power(self): + """ Return True if this power is an observer power. """ + return self.role == strings.OBSERVER_TYPE + + def is_omniscient_power(self): + """ Return True if this power is an omniscient power. """ + return self.role == strings.OMNISCIENT_TYPE + + def is_player_power(self): + """ Return True if this power is a player power. """ + return self.role == self.name + + def is_server_power(self): + """ Return True if this power is a server power. """ + return self.role == strings.SERVER_TYPE + + def is_controlled(self): + """ Return True if this power is controlled. """ + return self.controller.last_value() != strings.DUMMY + + def does_not_wait(self): + """ Return True if this power does not wait (ie. if we could already process orders of this power). """ + return self.order_is_set and not self.wait + + def update_controller(self, username, timestamp): + """ Update controller with given username and timestamp. """ + self.controller.put(timestamp, username) + + def set_controlled(self, username): + """ Control power with given username. Username may be None (meaning no controller). """ + if username is None or username == strings.DUMMY: + if self.controller.last_value() != strings.DUMMY: + self.controller.put(common.timestamp_microseconds(), strings.DUMMY) + self.tokens.clear() + self.wait = True + self.vote = strings.NEUTRAL + elif self.controller.last_value() == strings.DUMMY: + self.controller.put(common.timestamp_microseconds(), username) + self.wait = not self.game.real_time + elif self.controller.last_value() != username: + raise DiplomacyException('Power already controlled by someone else. Kick previous controller before.') + + def get_controller(self): + """ Return current power controller name ('dummy' if power is not controlled). """ + return self.controller.last_value() + + def get_controller_timestamp(self): + """ Return timestamp when current controller took control of this power. """ + return self.controller.last_key() + + def is_controlled_by(self, username): + """ Return True if this power is controlled by given username. """ + if username == constants.PRIVATE_BOT_USERNAME: + # Bot is connected if power is dummy and has some associated tokens. + return self.is_dummy() and bool(self.tokens) + return self.controller.last_value() == username + + # Server-only methods. + + def has_token(self, token): + """ Return True if this power has given token. """ + assert self.is_server_power() + return token in self.tokens + + def add_token(self, token): + """ Add given token to this power. """ + assert self.is_server_power() + self.tokens.add(token) + + def remove_tokens(self, tokens): + """ Remove sequence of tokens from this power. """ + assert self.is_server_power() + self.tokens.difference_update(tokens) diff --git a/diplomacy/engine/renderer.py b/diplomacy/engine/renderer.py new file mode 100644 index 0000000..8b5e10c --- /dev/null +++ b/diplomacy/engine/renderer.py @@ -0,0 +1,789 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +# -*- coding: utf-8 -*- +""" Renderer + - Contains the renderer object which is responsible for rendering a game state to svg +""" +import os +from xml.dom import minidom +from diplomacy import settings + +# Constants +LAYER_SC = 'SupplyCenterLayer' +LAYER_ORDER = 'OrderLayer' +LAYER_UNIT = 'UnitLayer' +LAYER_DISL = 'DislodgedUnitLayer' +ARMY = 'Army' +FLEET = 'Fleet' + +def _attr(node_element, attr_name): + """ Shorthand method to retrieve an XML attribute """ + return node_element.attributes[attr_name].value + +def _offset(str_float, offset): + """ Shorthand to add a offset to an attribute """ + return str(float(str_float) + offset) + + +class Renderer(): + """ Renderer object responsible for rendering a game state to svg """ + + def __init__(self, game): + """ Constructor + :param game: The instantiated game object to render + :type game: diplomacy.Game + """ + self.game = game + self.metadata = {} + self.xml_map = None + self.xml_map_path = os.path.join(settings.PACKAGE_DIR, 'maps', 'svg', self.game.map.name + '.svg') + + # Loading XML + if os.path.exists(self.xml_map_path): + self.xml_map = minidom.parse(self.xml_map_path).toxml() + self._load_metadata() + + def norm_order(self, order): + """ Normalizes the order format and split it into tokens + This is only used for **movement** orders (to make sure NO_CHECK games used the correct format) + Formats: + A PAR H + A PAR - BUR [VIA] + A PAR S BUR + A PAR S F BRE - PIC + F BRE C A PAR - LON + + :param order: The unformatted order (e.g. 'Paris - Burgundy') + :return: The tokens of the formatted order (e.g. ['A', 'PAR', '-', 'BUR']) + """ + return self.game._add_unit_types(self.game._expand_order(order.split())) # pylint: disable=protected-access + + def render(self, incl_orders=True, incl_abbrev=False, output_format='svg'): + """ Renders the current game and returns the XML representation + :param incl_orders: Optional. Flag to indicate we also want to render orders. + :param incl_abbrev: Optional. Flag to indicate we also want to display the provinces abbreviations. + :param output_format: The desired output format. + :return: The rendered image in the specified format. + """ + # pylint: disable=too-many-branches + if output_format not in ['svg']: + raise ValueError('Only "svg" format is current supported.') + if not self.game or not self.game.map or not self.xml_map: + return None + + # Parsing XML + xml_map = minidom.parseString(self.xml_map) + scs = self.game.map.scs[:] + + # Setting phase and note + nb_centers = [(power.name[:3], len(power.centers)) + for power in self.game.powers.values() + if not power.is_eliminated()] + nb_centers = sorted(nb_centers, key=lambda key: key[1], reverse=True) + nb_centers_per_power = ' '.join(['{}: {}'.format(name, centers) for name, centers in nb_centers]) + xml_map = self._set_current_phase(xml_map, self.game.get_current_phase()) + xml_map = self._set_note(xml_map, nb_centers_per_power, self.game.note) + + # Adding units, supply centers, and influence + for power in self.game.powers.values(): + for unit in power.units: + xml_map = self._add_unit(xml_map, unit, power.name, is_dislodged=False) + for unit in power.retreats: + xml_map = self._add_unit(xml_map, unit, power.name, is_dislodged=True) + for center in power.centers: + xml_map = self._add_supply_center(xml_map, center, power.name) + xml_map = self._set_influence(xml_map, center, power.name, has_supply_center=True) + scs.remove(center) + for loc in power.influence: + xml_map = self._set_influence(xml_map, loc, power.name, has_supply_center=False) + + # Orders + if incl_orders: + + # Regular orders (Normalized) + # A PAR H + # A PAR - BUR [VIA] + # A PAR S BUR + # A PAR S F BRE - PIC + # F BRE C A PAR - LON + for order_key in power.orders: + + # No_check order (Order, Invalid, Reorder) + # Otherwise regular order (unit is key, order without unit is value) + if order_key[0] in 'RIO': + order = power.orders[order_key] + else: + order = '{} {}'.format(order_key, power.orders[order_key]) + + # Normalizing and splitting in tokens + tokens = self.norm_order(order) + unit_loc = tokens[1] + + # Parsing based on order type + if not tokens or len(tokens) < 3: + continue + elif tokens[2] == 'H': + xml_map = self._issue_hold_order(xml_map, unit_loc, power.name) + elif tokens[2] == '-': + dest_loc = tokens[-1] if tokens[-1] != 'VIA' else tokens[-2] + xml_map = self._issue_move_order(xml_map, unit_loc, dest_loc, power.name) + elif tokens[2] == 'S': + dest_loc = tokens[-1] + if '-' in tokens: + src_loc = tokens[4] if tokens[3] == 'A' or tokens[3] == 'F' else tokens[3] + xml_map = self._issue_support_move_order(xml_map, unit_loc, src_loc, dest_loc, power.name) + else: + xml_map = self._issue_support_hold_order(xml_map, unit_loc, dest_loc, power.name) + elif tokens[2] == 'C': + src_loc = tokens[4] if tokens[3] == 'A' or tokens[3] == 'F' else tokens[3] + dest_loc = tokens[-1] + if src_loc != dest_loc and '-' in tokens: + xml_map = self._issue_convoy_order(xml_map, unit_loc, src_loc, dest_loc, power.name) + else: + raise RuntimeError('Unknown order: {}'.format(' '.join(tokens))) + + # Adjustment orders + # VOID xxx + # A PAR B + # A PAR D + # A PAR R BUR + # WAIVE + for order in power.adjust: + tokens = order.split() + if not tokens or tokens[0] == 'VOID' or tokens[-1] == 'WAIVE': + continue + elif tokens[-1] == 'B': + if len(tokens) < 3: + continue + xml_map = self._issue_build_order(xml_map, tokens[0], tokens[1], power.name) + elif tokens[-1] == 'D': + xml_map = self._issue_disband_order(xml_map, tokens[1]) + elif tokens[-2] == 'R': + src_loc = tokens[1] if tokens[0] == 'A' or tokens[0] == 'F' else tokens[0] + dest_loc = tokens[-1] + xml_map = self._issue_move_order(xml_map, src_loc, dest_loc, power.name) + else: + raise RuntimeError('Unknown order: {}'.format(order)) + + # Adding remaining supply centers + for center in scs: + xml_map = self._add_supply_center(xml_map, center, None) + + # Removing abbrev and mouse layer + svg_node = xml_map.getElementsByTagName('svg')[0] + for child_node in svg_node.childNodes: + if child_node.nodeName != 'g': + continue + if _attr(child_node, 'id') == 'BriefLabelLayer' and not incl_abbrev: + svg_node.removeChild(child_node) + elif _attr(child_node, 'id') == 'MouseLayer': + svg_node.removeChild(child_node) + + # Returning + return xml_map.toxml() + + def _load_metadata(self): + """ Loads meta-data embedded in the XML map and clears unused nodes """ + if not self.xml_map: + return + xml_map = minidom.parseString(self.xml_map) + + # Data + self.metadata = { + 'color': {}, + 'symbol_size': {}, + 'orders': {}, + 'coord': {} + } + + # Order drawings + for order_drawing in xml_map.getElementsByTagName('jdipNS:ORDERDRAWING'): + for child_node in order_drawing.childNodes: + + # Power Colors + if child_node.nodeName == 'jdipNS:POWERCOLORS': + for power_color in child_node.childNodes: + if power_color.nodeName == 'jdipNS:POWERCOLOR': + self.metadata['color'][_attr(power_color, 'power').upper()] = _attr(power_color, 'color') + + # Symbol size + elif child_node.nodeName == 'jdipNS:SYMBOLSIZE': + self.metadata['symbol_size'][_attr(child_node, 'name')] = (_attr(child_node, 'height'), + _attr(child_node, 'width')) + + # Order type + elif child_node.nodeName.startswith('jdipNS'): + order_type = child_node.nodeName.replace('jdipNS:', '') + self.metadata['orders'][order_type] = {} + for attr_name, attr_value in child_node.attributes.items(): + if ':' in attr_name: + continue + self.metadata['orders'][order_type][attr_name] = attr_value + + # Object coordinates + for province_data in xml_map.getElementsByTagName('jdipNS:PROVINCE_DATA'): + for child_node in province_data.childNodes: + + # Province + if child_node.nodeName == 'jdipNS:PROVINCE': + province = _attr(child_node, 'name').upper().replace('-', '/') + self.metadata['coord'][province] = {} + + for coord_node in child_node.childNodes: + if coord_node.nodeName == 'jdipNS:UNIT': + self.metadata['coord'][province]['unit'] = (_attr(coord_node, 'x'), _attr(coord_node, 'y')) + elif coord_node.nodeName == 'jdipNS:DISLODGED_UNIT': + self.metadata['coord'][province]['disl'] = (_attr(coord_node, 'x'), _attr(coord_node, 'y')) + elif coord_node.nodeName == 'jdipNS:SUPPLY_CENTER': + self.metadata['coord'][province]['sc'] = (_attr(coord_node, 'x'), _attr(coord_node, 'y')) + + # Deleting + svg_node = xml_map.getElementsByTagName('svg')[0] + svg_node.removeChild(xml_map.getElementsByTagName('jdipNS:DISPLAY')[0]) + svg_node.removeChild(xml_map.getElementsByTagName('jdipNS:ORDERDRAWING')[0]) + svg_node.removeChild(xml_map.getElementsByTagName('jdipNS:PROVINCE_DATA')[0]) + self.xml_map = xml_map.toxml() + + def _add_unit(self, xml_map, unit, power_name, is_dislodged): + """ Adds a unit to the map + :param xml_map: The xml map being generated + :param unit: The unit to add (e.g. 'A PAR') + :param power_name: The name of the power owning the unit (e.g. 'FRANCE') + :param is_dislodged: Boolean. Indicates if the unit is dislodged + :return: Nothing + """ + unit_type, loc = unit.split() + symbol = FLEET if unit_type == 'F' else ARMY + loc_x = _offset(self.metadata['coord'][loc][('unit', 'disl')[is_dislodged]][0], -11.5) + loc_y = _offset(self.metadata['coord'][loc][('unit', 'disl')[is_dislodged]][1], - 10.) + node = xml_map.createElement('use') + node.setAttribute('x', loc_x) + node.setAttribute('y', loc_y) + node.setAttribute('height', self.metadata['symbol_size'][symbol][0]) + node.setAttribute('width', self.metadata['symbol_size'][symbol][1]) + node.setAttribute('xlink:href', '#{}{}'.format(('', 'Dislodged')[is_dislodged], symbol)) + node.setAttribute('class', 'unit{}'.format(power_name.lower())) + + # Inserting + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' \ + and _attr(child_node, 'id') == ['UnitLayer', 'DislodgedUnitLayer'][is_dislodged]: + child_node.appendChild(node) + break + return xml_map + + def _add_supply_center(self, xml_map, loc, power_name): + """ Adds a supply center to the map + :param xml_map: The xml map being generated + :param loc: The province where to add the SC (e.g. 'PAR') + :param power_name: The name of the power owning the SC or None + :return: Nothing + """ + symbol = 'SupplyCenter' + loc_x = _offset(self.metadata['coord'][loc]['sc'][0], -8.5) + loc_y = _offset(self.metadata['coord'][loc]['sc'][1], -11.) + node = xml_map.createElement('use') + node.setAttribute('x', loc_x) + node.setAttribute('y', loc_y) + node.setAttribute('height', self.metadata['symbol_size'][symbol][0]) + node.setAttribute('width', self.metadata['symbol_size'][symbol][1]) + node.setAttribute('xlink:href', '#{}'.format(symbol)) + if power_name: + node.setAttribute('class', 'sc{}'.format(power_name.lower())) + else: + node.setAttribute('class', 'scnopower') + + # Inserting + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'SupplyCenterLayer': + child_node.appendChild(node) + break + return xml_map + + def _set_influence(self, xml_map, loc, power_name, has_supply_center=False): + """ Sets the influence on the map + :param xml_map: The xml map being generated + :param loc: The province being influenced (e.g. 'PAR') + :param power_name: The name of the power influencing the province + :param has_supply_center: Boolean flag to acknowledge we are modifying a loc with a SC + :return: Nothing + """ + loc = loc.upper()[:3] + if loc in self.game.map.scs and not has_supply_center: + return xml_map + if self.game.map.area_type(loc) not in ['LAND', 'COAST']: + return xml_map + + # Inserting + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'MapLayer': + for map_node in child_node.childNodes: + if map_node.nodeName == 'path' and _attr(map_node, 'id') == '_{}'.format(loc.lower()): + if power_name: + map_node.setAttribute('class', power_name.lower()) + else: + map_node.setAttribute('class', 'nopower') + return xml_map + + # Returning + return xml_map + + @staticmethod + def _set_current_phase(xml_map, current_phase): + """ Sets the phase text at the bottom right of the the map + :param xml_map: The xml map being generated + :param current_phase: The current phase (e.g. 'S1901M) + :return: Nothing + """ + current_phase = 'FINAL' if current_phase[0] == '?' else current_phase + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'text' and _attr(child_node, 'id') == 'CurrentPhase': + child_node.childNodes[0].nodeValue = current_phase + return xml_map + return xml_map + + @staticmethod + def _set_note(xml_map, note_1, note_2): + """ Sets a note at the top left of the map + :param xml_map: The xml map being generated + :param note_1: The text to display on the first line + :param note_2: The text to display on the second line + :return: Nothing + """ + note_1 = note_1 or ' ' + note_2 = note_2 or ' ' + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'text' and _attr(child_node, 'id') == 'CurrentNote': + child_node.childNodes[0].nodeValue = note_1 + if child_node.nodeName == 'text' and _attr(child_node, 'id') == 'CurrentNote2': + child_node.childNodes[0].nodeValue = note_2 + return xml_map + + def _issue_hold_order(self, xml_map, loc, power_name): + """ Adds a hold order to the map + :param xml_map: The xml map being generated + :param loc: The province where the unit is holding (e.g. 'PAR') + :param power_name: The name of the power owning the unit + :return: Nothing + """ + # Calculating polygon coord + polygon_coord = [] + loc_x = _offset(self.metadata['coord'][loc]['unit'][0], 8.5) + loc_y = _offset(self.metadata['coord'][loc]['unit'][1], 9.5) + for offset in [(13.8, -33.3), (33.3, -13.8), (33.3, 13.8), (13.8, 33.3), (-13.8, 33.3), (-33.3, 13.8), + (-33.3, -13.8), (-13.8, -33.3)]: + polygon_coord += [_offset(loc_x, offset[0]) + ',' + _offset(loc_y, offset[1])] + + # Building polygon + g_node = xml_map.createElement('g') + + poly_1 = xml_map.createElement('polygon') + poly_1.setAttribute('stroke-width', '10') + poly_1.setAttribute('class', 'varwidthshadow') + poly_1.setAttribute('points', ' '.join(polygon_coord)) + + poly_2 = xml_map.createElement('polygon') + poly_2.setAttribute('stroke-width', '6') + poly_2.setAttribute('class', 'varwidthorder') + poly_2.setAttribute('points', ' '.join(polygon_coord)) + poly_2.setAttribute('stroke', self.metadata['color'][power_name]) + + g_node.appendChild(poly_1) + g_node.appendChild(poly_2) + + # Inserting + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer': + for layer_node in child_node.childNodes: + if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer1': + layer_node.appendChild(g_node) + return xml_map + + # Returning + return xml_map + + def _issue_support_hold_order(self, xml_map, loc, dest_loc, power_name): + """ Issues a support hold order + :param xml_map: The xml map being generated + :param loc: The location of the unit sending support (e.g. 'BER') + :param dest_loc: The location where the unit is holding from (e.g. 'PAR') + :param power_name: The power name issuing the move order + :return: Nothing + """ + loc_x = _offset(self.metadata['coord'][loc]['unit'][0], 10) + loc_y = _offset(self.metadata['coord'][loc]['unit'][1], 10) + dest_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 10) + dest_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 10) + + # Adjusting destination + delta_x = float(dest_loc_x) - float(loc_x) + delta_y = float(dest_loc_y) - float(loc_y) + vector_length = (delta_x ** 2. + delta_y ** 2.) ** 0.5 + dest_loc_x = str(round(float(loc_x) + (vector_length - 35.) / vector_length * delta_x, 2)) + dest_loc_y = str(round(float(loc_y) + (vector_length - 35.) / vector_length * delta_y, 2)) + + # Getting polygon coordinates + polygon_coord = [] + poly_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 8.5) + poly_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 9.5) + for offset in [(15.9, -38.3), (38.3, -15.9), (38.3, 15.9), (15.9, 38.3), (-15.9, 38.3), (-38.3, 15.9), + (-38.3, -15.9), (-15.9, -38.3)]: + polygon_coord += [_offset(poly_loc_x, offset[0]) + ',' + _offset(poly_loc_y, offset[1])] + + # Creating nodes + g_node = xml_map.createElement('g') + + shadow_line = xml_map.createElement('line') + shadow_line.setAttribute('x1', loc_x) + shadow_line.setAttribute('y1', loc_y) + shadow_line.setAttribute('x2', dest_loc_x) + shadow_line.setAttribute('y2', dest_loc_y) + shadow_line.setAttribute('class', 'shadowdash') + + support_line = xml_map.createElement('line') + support_line.setAttribute('x1', loc_x) + support_line.setAttribute('y1', loc_y) + support_line.setAttribute('x2', dest_loc_x) + support_line.setAttribute('y2', dest_loc_y) + support_line.setAttribute('class', 'supportorder') + support_line.setAttribute('stroke', self.metadata['color'][power_name]) + + shadow_poly = xml_map.createElement('polygon') + shadow_poly.setAttribute('class', 'shadowdash') + shadow_poly.setAttribute('points', ' '.join(polygon_coord)) + + support_poly = xml_map.createElement('polygon') + support_poly.setAttribute('class', 'supportorder') + support_poly.setAttribute('points', ' '.join(polygon_coord)) + support_poly.setAttribute('stroke', self.metadata['color'][power_name]) + + # Inserting + g_node.appendChild(shadow_line) + g_node.appendChild(support_line) + g_node.appendChild(shadow_poly) + g_node.appendChild(support_poly) + + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer': + for layer_node in child_node.childNodes: + if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer2': + layer_node.appendChild(g_node) + return xml_map + + # Returning + return xml_map + + def _issue_move_order(self, xml_map, src_loc, dest_loc, power_name): + """ Issues a move order + :param xml_map: The xml map being generated + :param src_loc: The location where the unit is moving from (e.g. 'PAR') + :param dest_loc: The location where the unit is moving to (e.g. 'MAR') + :param power_name: The power name issuing the move order + :return: Nothing + """ + if self.game.get_current_phase()[-1] == 'R': + src_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], -2.5) + src_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], -2.5) + else: + src_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], 10) + src_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], 10) + dest_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 10) + dest_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 10) + + # Adjusting destination + delta_x = float(dest_loc_x) - float(src_loc_x) + delta_y = float(dest_loc_y) - float(src_loc_y) + vector_length = (delta_x ** 2. + delta_y ** 2.) ** 0.5 + dest_loc_x = str(round(float(src_loc_x) + (vector_length - 30.) / vector_length * delta_x, 2)) + dest_loc_y = str(round(float(src_loc_y) + (vector_length - 30.) / vector_length * delta_y, 2)) + + # Creating nodes + g_node = xml_map.createElement('g') + + line_with_shadow = xml_map.createElement('line') + line_with_shadow.setAttribute('x1', src_loc_x) + line_with_shadow.setAttribute('y1', src_loc_y) + line_with_shadow.setAttribute('x2', dest_loc_x) + line_with_shadow.setAttribute('y2', dest_loc_y) + line_with_shadow.setAttribute('class', 'varwidthshadow') + line_with_shadow.setAttribute('stroke-width', '10') + + line_with_arrow = xml_map.createElement('line') + line_with_arrow.setAttribute('x1', src_loc_x) + line_with_arrow.setAttribute('y1', src_loc_y) + line_with_arrow.setAttribute('x2', dest_loc_x) + line_with_arrow.setAttribute('y2', dest_loc_y) + line_with_arrow.setAttribute('class', 'varwidthorder') + line_with_arrow.setAttribute('stroke', self.metadata['color'][power_name]) + line_with_arrow.setAttribute('stroke-width', '6') + line_with_arrow.setAttribute('marker-end', 'url(#arrow)') + + # Inserting + g_node.appendChild(line_with_shadow) + g_node.appendChild(line_with_arrow) + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer': + for layer_node in child_node.childNodes: + if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer1': + layer_node.appendChild(g_node) + return xml_map + + # Returning + return xml_map + + def _issue_support_move_order(self, xml_map, loc, src_loc, dest_loc, power_name): + """ Issues a support move order + :param xml_map: The xml map being generated + :param loc: The location of the unit sending support (e.g. 'BER') + :param src_loc: The location where the unit is moving from (e.g. 'PAR') + :param dest_loc: The location where the unit is moving to (e.g. 'MAR') + :param power_name: The power name issuing the move order + :return: Nothing + """ + loc_x = _offset(self.metadata['coord'][loc]['unit'][0], 10) + loc_y = _offset(self.metadata['coord'][loc]['unit'][1], 10) + src_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], 10) + src_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], 10) + dest_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 10) + dest_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 10) + + # Adjusting destination + delta_x = float(dest_loc_x) - float(src_loc_x) + delta_y = float(dest_loc_y) - float(src_loc_y) + vector_length = (delta_x ** 2. + delta_y ** 2.) ** 0.5 + dest_loc_x = str(round(float(src_loc_x) + (vector_length - 30.) / vector_length * delta_x, 2)) + dest_loc_y = str(round(float(src_loc_y) + (vector_length - 30.) / vector_length * delta_y, 2)) + + # Creating nodes + g_node = xml_map.createElement('g') + + path_with_shadow = xml_map.createElement('path') + path_with_shadow.setAttribute('class', 'shadowdash') + path_with_shadow.setAttribute('d', 'M {},{} C {},{} {},{} {},{}'.format(loc_x, loc_y, + src_loc_x, src_loc_y, + src_loc_x, src_loc_y, + dest_loc_x, dest_loc_y)) + + path_with_arrow = xml_map.createElement('path') + path_with_arrow.setAttribute('class', 'supportorder') + path_with_arrow.setAttribute('stroke', self.metadata['color'][power_name]) + path_with_arrow.setAttribute('marker-end', 'url(#arrow)') + path_with_arrow.setAttribute('d', 'M {},{} C {},{} {},{} {},{}'.format(loc_x, loc_y, + src_loc_x, src_loc_y, + src_loc_x, src_loc_y, + dest_loc_x, dest_loc_y)) + + # Inserting + g_node.appendChild(path_with_shadow) + g_node.appendChild(path_with_arrow) + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer': + for layer_node in child_node.childNodes: + if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer2': + layer_node.appendChild(g_node) + return xml_map + + # Returning + return xml_map + + def _issue_convoy_order(self, xml_map, loc, src_loc, dest_loc, power_name): + """ Issues a convoy order + :param xml_map: The xml map being generated + :param loc: The location of the unit convoying (e.g. 'BER') + :param src_loc: The location where the unit being convoyed is moving from (e.g. 'PAR') + :param dest_loc: The location where the unit being convoyed is moving to (e.g. 'MAR') + :param power_name: The power name issuing the convoy order + :return: Nothing + """ + loc_x = _offset(self.metadata['coord'][loc]['unit'][0], 10) + loc_y = _offset(self.metadata['coord'][loc]['unit'][1], 10) + src_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], 10) + src_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], 10) + dest_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 10) + dest_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 10) + + # Adjusting starting arrow (from convoy to start location) + # This is to avoid the end of the arrow conflicting with the convoy triangle + src_delta_x = float(src_loc_x) - float(loc_x) + src_delta_y = float(src_loc_y) - float(loc_y) + src_vector_length = (src_delta_x ** 2. + src_delta_y ** 2.) ** 0.5 + src_loc_x_1 = str(round(float(loc_x) + (src_vector_length - 30.) / src_vector_length * src_delta_x, 2)) + src_loc_y_1 = str(round(float(loc_y) + (src_vector_length - 30.) / src_vector_length * src_delta_y, 2)) + + # Adjusting destination arrow (from start location to destination location) + # This is to avoid the start of the arrow conflicting with the convoy triangle + dest_delta_x = float(src_loc_x) - float(dest_loc_x) + dest_delta_y = float(src_loc_y) - float(dest_loc_y) + dest_vector_length = (dest_delta_x ** 2. + dest_delta_y ** 2.) ** 0.5 + src_loc_x_2 = str(round(float(dest_loc_x) + (dest_vector_length - 30.) / dest_vector_length * dest_delta_x, 2)) + src_loc_y_2 = str(round(float(dest_loc_y) + (dest_vector_length - 30.) / dest_vector_length * dest_delta_y, 2)) + + # Adjusting destination arrow (from start location to destination location) + # This is to avoid the start of the arrow conflicting with the convoy triangle + dest_delta_x = float(dest_loc_x) - float(src_loc_x) + dest_delta_y = float(dest_loc_y) - float(src_loc_y) + dest_vector_length = (dest_delta_x ** 2. + dest_delta_y ** 2.) ** 0.5 + dest_loc_x = str(round(float(src_loc_x) + (dest_vector_length - 30.) / dest_vector_length * dest_delta_x, 2)) + dest_loc_y = str(round(float(src_loc_y) + (dest_vector_length - 30.) / dest_vector_length * dest_delta_y, 2)) + + # Getting convoy triangle coordinates + triangle_coord = [] + triangle_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], 10) + triangle_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], 10) + for offset in [(0, -38.3), (33.2, 19.1), (-33.2, 19.1)]: + triangle_coord += [_offset(triangle_loc_x, offset[0]) + ',' + _offset(triangle_loc_y, offset[1])] + + # Creating nodes + g_node = xml_map.createElement('g') + + src_shadow_line = xml_map.createElement('line') + src_shadow_line.setAttribute('x1', loc_x) + src_shadow_line.setAttribute('y1', loc_y) + src_shadow_line.setAttribute('x2', src_loc_x_1) + src_shadow_line.setAttribute('y2', src_loc_y_1) + src_shadow_line.setAttribute('class', 'shadowdash') + + dest_shadow_line = xml_map.createElement('line') + dest_shadow_line.setAttribute('x1', src_loc_x_2) + dest_shadow_line.setAttribute('y1', src_loc_y_2) + dest_shadow_line.setAttribute('x2', dest_loc_x) + dest_shadow_line.setAttribute('y2', dest_loc_y) + dest_shadow_line.setAttribute('class', 'shadowdash') + + src_convoy_line = xml_map.createElement('line') + src_convoy_line.setAttribute('x1', loc_x) + src_convoy_line.setAttribute('y1', loc_y) + src_convoy_line.setAttribute('x2', src_loc_x_1) + src_convoy_line.setAttribute('y2', src_loc_y_1) + src_convoy_line.setAttribute('class', 'convoyorder') + src_convoy_line.setAttribute('stroke', self.metadata['color'][power_name]) + + dest_convoy_line = xml_map.createElement('line') + dest_convoy_line.setAttribute('x1', src_loc_x_2) + dest_convoy_line.setAttribute('y1', src_loc_y_2) + dest_convoy_line.setAttribute('x2', dest_loc_x) + dest_convoy_line.setAttribute('y2', dest_loc_y) + dest_convoy_line.setAttribute('class', 'convoyorder') + dest_convoy_line.setAttribute('stroke', self.metadata['color'][power_name]) + dest_convoy_line.setAttribute('marker-end', 'url(#arrow)') + + shadow_poly = xml_map.createElement('polygon') + shadow_poly.setAttribute('class', 'shadowdash') + shadow_poly.setAttribute('points', ' '.join(triangle_coord)) + + convoy_poly = xml_map.createElement('polygon') + convoy_poly.setAttribute('class', 'convoyorder') + convoy_poly.setAttribute('points', ' '.join(triangle_coord)) + convoy_poly.setAttribute('stroke', self.metadata['color'][power_name]) + + # Inserting + g_node.appendChild(src_shadow_line) + g_node.appendChild(dest_shadow_line) + g_node.appendChild(src_convoy_line) + g_node.appendChild(dest_convoy_line) + g_node.appendChild(shadow_poly) + g_node.appendChild(convoy_poly) + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer': + for layer_node in child_node.childNodes: + if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer2': + layer_node.appendChild(g_node) + return xml_map + + # Returning + return xml_map + + def _issue_build_order(self, xml_map, unit_type, loc, power_name): + """ Adds a build army/fleet order to the map + :param xml_map: The xml map being generated + :param unit_type: The unit type to build ('A' or 'F') + :param loc: The province where the army is to be built (e.g. 'PAR') + :param power_name: The name of the power building the unit + :return: Nothing + """ + loc_x = _offset(self.metadata['coord'][loc]['unit'][0], -11.5) + loc_y = _offset(self.metadata['coord'][loc]['unit'][1], - 10.) + build_loc_x = _offset(self.metadata['coord'][loc]['unit'][0], -20.5) + build_loc_y = _offset(self.metadata['coord'][loc]['unit'][1], -20.5) + + # Symbols + symbol = ARMY if unit_type == 'A' else FLEET + build_symbol = 'BuildUnit' + + # Creating nodes + g_node = xml_map.createElement('g') + + symbol_node = xml_map.createElement('use') + symbol_node.setAttribute('x', loc_x) + symbol_node.setAttribute('y', loc_y) + symbol_node.setAttribute('height', self.metadata['symbol_size'][symbol][0]) + symbol_node.setAttribute('width', self.metadata['symbol_size'][symbol][1]) + symbol_node.setAttribute('xlink:href', '#{}'.format(symbol)) + symbol_node.setAttribute('class', 'unit{}'.format(power_name.lower())) + + build_node = xml_map.createElement('use') + build_node.setAttribute('x', build_loc_x) + build_node.setAttribute('y', build_loc_y) + build_node.setAttribute('height', self.metadata['symbol_size'][build_symbol][0]) + build_node.setAttribute('width', self.metadata['symbol_size'][build_symbol][1]) + build_node.setAttribute('xlink:href', '#{}'.format(build_symbol)) + + # Inserting + g_node.appendChild(build_node) + g_node.appendChild(symbol_node) + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'HighestOrderLayer': + child_node.appendChild(g_node) + return xml_map + + # Returning + return xml_map + + def _issue_disband_order(self, xml_map, loc): + """ Adds a disband order to the map + :param xml_map: The xml map being generated + :param loc: The province where the unit is disbanded (e.g. 'PAR') + :return: Nothing + """ + if self.game.get_current_phase()[-1] == 'R': + loc_x = _offset(self.metadata['coord'][loc]['unit'][0], -29.) + loc_y = _offset(self.metadata['coord'][loc]['unit'][1], -27.5) + else: + loc_x = _offset(self.metadata['coord'][loc]['unit'][0], -16.5) + loc_y = _offset(self.metadata['coord'][loc]['unit'][1], -15.) + + # Symbols + symbol = 'RemoveUnit' + + # Creating nodes + g_node = xml_map.createElement('g') + symbol_node = xml_map.createElement('use') + symbol_node.setAttribute('x', loc_x) + symbol_node.setAttribute('y', loc_y) + symbol_node.setAttribute('height', self.metadata['symbol_size'][symbol][0]) + symbol_node.setAttribute('width', self.metadata['symbol_size'][symbol][1]) + symbol_node.setAttribute('xlink:href', '#{}'.format(symbol)) + + # Inserting + g_node.appendChild(symbol_node) + for child_node in xml_map.getElementsByTagName('svg')[0].childNodes: + if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'HighestOrderLayer': + child_node.appendChild(g_node) + return xml_map + + # Returning + return xml_map |