aboutsummaryrefslogtreecommitdiff
path: root/diplomacy/engine
diff options
context:
space:
mode:
Diffstat (limited to 'diplomacy/engine')
-rw-r--r--diplomacy/engine/__init__.py16
-rw-r--r--diplomacy/engine/game.py4289
-rw-r--r--diplomacy/engine/map.py1361
-rw-r--r--diplomacy/engine/message.py115
-rw-r--r--diplomacy/engine/power.py392
-rw-r--r--diplomacy/engine/renderer.py789
6 files changed, 6962 insertions, 0 deletions
diff --git a/diplomacy/engine/__init__.py b/diplomacy/engine/__init__.py
new file mode 100644
index 0000000..4f2769f
--- /dev/null
+++ b/diplomacy/engine/__init__.py
@@ -0,0 +1,16 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
diff --git a/diplomacy/engine/game.py b/diplomacy/engine/game.py
new file mode 100644
index 0000000..73b2ff9
--- /dev/null
+++ b/diplomacy/engine/game.py
@@ -0,0 +1,4289 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+# -*- coding: utf-8 -*-
+""" Game
+ - Contains the game engine
+"""
+# pylint: disable=too-many-lines
+import os
+import sys
+import time
+import uuid
+import random
+from copy import deepcopy
+
+import numpy as np
+
+from diplomacy import settings
+import diplomacy.utils.errors as err
+from diplomacy.engine.map import Map
+from diplomacy.engine.message import Message, GLOBAL
+from diplomacy.engine.power import Power
+from diplomacy.engine.renderer import Renderer
+from diplomacy.utils import PriorityDict, common, exceptions, parsing, strings
+from diplomacy.utils.jsonable import Jsonable
+from diplomacy.utils.sorted_dict import SortedDict
+from diplomacy.utils.constants import OrderSettings
+from diplomacy.utils.game_phase_data import GamePhaseData, MESSAGES_TYPE
+
+# Constants
+UNDETERMINED, POWER, UNIT, LOCATION, COAST, ORDER, MOVE_SEP, OTHER = 0, 1, 2, 3, 4, 5, 6, 7
+
+class Game(Jsonable):
+ """
+ - combat - Dictionary of dictionaries containing the strength of every attack on a location (including units
+ who don't count toward dislodgment)
+ - Format: {loc: attack_strength: [ ['src loc', [support loc] ]}
+ e.g. { 'MUN': { 1 : [ ['A MUN', [] ], ['A RUH', [] ] ], 2 : [ ['A SIL', ['A BOH'] ] ] } }
+ MUN is holding, being attack without support from RUH and being attacked with support from SIL
+ (S from BOH)
+ - command - Contains the list of finalized orders to be processed (same format as orders, but without .order)
+ e.g. {'A PAR': '- A MAR'}
+ - controlled_powers: for client games only. List of powers currently controlled by associated client user.
+ - convoy_paths - Contains the list of remaining convoys path for each convoyed unit to reach their destination
+ Note: This is used to see if there are still active convoy paths remaining.
+ Note: This also include the start and ending location
+ e.g. {'A PAR': [ ['PAR', 'ION','NAO', 'MAR], ['PAR', 'ION', 'MAR'] ], ... }
+ - convoy_paths_possible - Contains the list of possible convoy paths given the current fleet locations or None
+ e.g. [(START_LOC, {Fleets Req}, {possible dest}), ...]
+ - convoy_paths_dest - Contains a dictionary of possible paths to reach destination from start or None
+ e.g. {start_loc: {dest_loc_1: [{fleets}, {fleets}, {fleets}], dest_loc_2: [{fleets, fleets}]}
+ - deadline: integer: game deadline in seconds.
+ - dislodged - Contains a dictionary of dislodged units (and the site that dislodged them')
+ e.g. { 'A PAR': 'MAR' }
+ - error - Contains a list of errors that the game generated
+ e.g. ['NO MASTER SPECIFIED']
+ - game_id: String that contains the current game's ID
+ e.g. '123456'
+ - lost - Contains a dictionary of centers that have been lost during the term
+ e.g. {'PAR': 'FRANCE'} to indicate that PAR was lost by France (previous owner)
+ - map: Contains a reference to the current map (Map instance)
+ e.g. map = Map('standard')
+ - map_name: Contains a reference to the name of the map that was loaded
+ e.g. map_name = 'standard'
+ - messages (only for non-observer games): history of messages exchanged inside this game.
+ Sorted dict mapping message timestamps to message objects (instances of diplomacy.Message).
+ Format: {message.time_sent => message}
+ - message_history: history of messages through all played phases.
+ Sorted dict mapping a short phase name to a message dict
+ (with same format as field `message` describe above).
+ Format: {short phase name => {message.time_sent => message}}
+ Wrapped in a sorted dict at runtime, see method __init__().
+ - meta_rules - Contains the rules that have been processed as directives
+ e.g. ['NO_PRESS']
+ - n_controls: integer: exact number of controlled powers allowed for this game.
+ If game start mode is not START_MASTER, then game starts as soon as this number of powers
+ are controlled.
+ - no_rules - Contains the list of rules that have been disabled (prefixed with '!')
+ e.g ['NO_PRESS']
+ - note - A note to display on the rendering
+ e.g. 'Winner: FRANCE'
+ - observer_level: for client games only. Highest observation level allowed for associated client user.
+ Either "master_type", "omniscient_type" or "observer_type".
+ - orders - Contains the list of current orders (not yet processed)
+ e.g. {'A PAR': '- A MAR'}
+ - ordered_units - Contains a dictionary of the units ordered by each power in the last phase
+ e.g. {'FRANCE': ['A PAR', 'A MAR'], 'ENGLAND': ... }
+ - order_history - Contains the history of orders from each player from the beginning of the game.
+ Sorted dict mapping a short phase name to a dictionary of orders
+ (powers names as keys, powers orders as values).
+ Format: {short phase name => {power name => [orders]}}
+ Wrapped in a sorted dict at runtime, see method __init__().
+ - outcome - Contains the game outcome
+ e.g. [lastPhase, victor1, victor2, victor3]
+ - phase: String that contains a long representation of the current phase
+ e.g. 'SPRING 1901 MOVEMENT'
+ - phase_type: Indicates the current phase type
+ (e.g. 'M' for Movement, 'R' for Retreats, 'A' for Adjustment, '-' for non-playing phase)
+ e.g. 'M'
+ - popped - Contains a list of all retreaters who didn't make it
+ e.g. ['A PAR', 'A MAR']
+ - powers - Contains a dictionary mapping power names to power instances in the game
+ e.g. {'FRANCE': FrancePower, 'ENGLAND': EnglishPower, ...}
+ - registration_password: ** hashed ** version of password to be sent by a player to join this game.
+ - renderer - Contains the object in charge of rendering the map
+ e.g. Renderer()
+ - result - Contains the result of the action for each unit.
+ In Movement Phase, result can be 'no convoy', 'bounce', 'void', 'cut', 'dislodged', 'disrupted'
+ e.g. { 'A PAR': ['cut', 'void'] }
+ In Retreats phase, result can be 'bounce', 'disband', 'void'
+ e.g. { 'A PAR': ['cut', 'void'] }
+ In Adjustments phase, result can be 'void' or ''
+ e.g. { 'A PAR': ['', 'void'] } # e.g. to indicate a successful build, and a void build.
+ - result_history - Contains the history of orders results for all played phases.
+ Sorted dict mapping a short phase name to a dictionary of order results for this phase.
+ Dictionary of order results maps a unit to a list of results. See field result for more details.
+ Format: {short phase name => {unit => [results]}}
+ Wrapped in a sorted dict at runtime, see method __init__().
+ - role: game type (observer, omniscient, player or server game).
+ Either a power name (for player game) or a value in diplomacy.utils.strings.ALL_ROLE_TYPES.
+ - rules: Contains a list of active rules
+ e.g. ['NO_PRESS', ...]
+ - state_history: history of previous game states (returned by method get_state()) for this game.
+ Sorted dict mapping a short phase name to a game state.
+ Each game state is associated to a timestamp generated when state is created by method get_state().
+ State timestamp then represents the "end" time of the state, ie. time when this state was saved and
+ archived in state history.
+ Format: {short phase name => state}
+ Wrapped in a sorted dict at runtime, see method __init__().
+ - status: game status (forming, active, paused, completed or canceled).
+ Possible values in diplomacy.utils.strings.ALL_GAME_STATUSES.
+ - supports - Contains a dictionary of support for each unit
+ - Format: { 'unit': [nb_of_support, [list of supporting units]] }
+ e.g. { 'A PAR': [2, ['A MAR']] }
+ 2 support, but the Marseille support does NOT count toward dislodgment
+ - timestamp_created: timestamp in microseconds when game object was created on server side.
+ - victory - Indicates the number of SUPPLY [default] centers one power must control to win the game
+ - Format: [reqFirstYear, reqSecondYear, ..., reqAllFurtherYears]
+ e.g. [10,10,18] for 10 the 1st year, 10 the 2nd year, 18 year 3+
+ - win - Indicates the minimum number of centers required to win
+ e.g. 3
+ - zobrist_hash - Contains the zobrist hash representing the current state of this game
+ e.g. 12545212418541325
+ """
+ # pylint: disable=too-many-instance-attributes
+ __slots__ = ['victory', 'no_rules', 'meta_rules', 'phase', 'note', 'map', 'powers', 'outcome', 'error', 'popped',
+ 'messages', 'order_history', 'orders', 'ordered_units', 'phase_type', 'win', 'combat', 'command',
+ 'result', 'supports', 'dislodged', 'lost', 'convoy_paths', 'convoy_paths_possible',
+ 'convoy_paths_dest', 'zobrist_hash', 'renderer', 'game_id', 'map_name', 'role', 'rules',
+ 'message_history', 'state_history', 'result_history', 'status', 'timestamp_created', 'n_controls',
+ 'deadline', 'registration_password', 'observer_level', 'controlled_powers', '_phase_wrapper_type',
+ 'phase_abbr']
+ zobrist_tables = {}
+ rule_cache = ()
+ model = {
+ strings.CONTROLLED_POWERS: parsing.OptionalValueType(parsing.SequenceType(str)),
+ strings.DEADLINE: parsing.DefaultValueType(int, 300),
+ strings.ERROR: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.GAME_ID: parsing.OptionalValueType(str),
+ strings.MAP_NAME: parsing.DefaultValueType(str, 'standard'),
+ strings.MESSAGE_HISTORY: parsing.DefaultValueType(parsing.DictType(str, MESSAGES_TYPE), {}),
+ strings.MESSAGES: parsing.DefaultValueType(MESSAGES_TYPE, []),
+ strings.META_RULES: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.N_CONTROLS: parsing.OptionalValueType(int),
+ strings.NO_RULES: parsing.DefaultValueType(parsing.SequenceType(str, set), []),
+ strings.NOTE: parsing.DefaultValueType(str, ''),
+ strings.OBSERVER_LEVEL: parsing.OptionalValueType(
+ parsing.EnumerationType((strings.MASTER_TYPE, strings.OMNISCIENT_TYPE, strings.OBSERVER_TYPE))),
+ strings.ORDER_HISTORY: parsing.DefaultValueType(
+ parsing.DictType(str, parsing.DictType(str, parsing.SequenceType(str))), {}),
+ strings.OUTCOME: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.PHASE: parsing.DefaultValueType(str, ''),
+ strings.PHASE_ABBR: parsing.DefaultValueType(str, ''),
+ strings.POWERS: parsing.DefaultValueType(parsing.DictType(str, parsing.JsonableClassType(Power)), {}),
+ strings.REGISTRATION_PASSWORD: parsing.OptionalValueType(str),
+ strings.RESULT_HISTORY: parsing.DefaultValueType(parsing.DictType(str, parsing.DictType(
+ str, parsing.SequenceType(parsing.EnumerationType(
+ ['no convoy', 'bounce', 'void', 'cut', 'dislodged', 'disrupted', 'disband', ''])))), {}),
+ strings.ROLE: parsing.DefaultValueType(str, strings.SERVER_TYPE),
+ strings.RULES: parsing.DefaultValueType(parsing.SequenceType(str, sequence_builder=list), ()),
+ strings.STATE_HISTORY: parsing.DefaultValueType(parsing.DictType(str, dict), {}),
+ strings.STATUS: parsing.DefaultValueType(parsing.EnumerationType(strings.ALL_GAME_STATUSES), strings.FORMING),
+ strings.TIMESTAMP_CREATED: parsing.OptionalValueType(int),
+ strings.VICTORY: parsing.DefaultValueType(parsing.SequenceType(int), []),
+ strings.WIN: parsing.DefaultValueType(int, 0),
+ strings.ZOBRIST_HASH: parsing.DefaultValueType(parsing.StringableType(np.int64), '0'),
+ }
+
+ def __init__(self, game_id=None, **kwargs):
+ """ Constructor """
+ self.victory = None
+ self.no_rules = set()
+ self.meta_rules = []
+ self.phase, self.note = '', ''
+ self.map = None # type: Map
+ self.powers = {}
+ self.outcome, self.error, self.popped = [], [], []
+ self.orders, self.ordered_units = {}, {}
+ self.phase_type = None
+ self.win = None
+ self.combat, self.command, self.result = {}, {}, {}
+ self.supports, self.dislodged, self.lost = {}, {}, {}
+ self.convoy_paths, self.convoy_paths_possible, self.convoy_paths_dest = {}, None, None
+ self.zobrist_hash = 0
+ self.renderer = None
+ self.game_id = None # type: str
+ self.map_name = None # type: str
+ self.messages = None # type: SortedDict
+ self.role = None # type: str
+ self.rules = []
+ self.state_history, self.order_history, self.result_history, self.message_history = {}, {}, {}, {}
+ self.status = None # type: str
+ self.timestamp_created = None # type: int
+ self.n_controls = None
+ self.deadline = 0
+ self.registration_password = None
+ self.observer_level = None
+ self.controlled_powers = None
+
+ # Remove rules from kwargs (if present), as we want to add them manually using self.add_rule().
+ rules = kwargs.pop(strings.RULES, None)
+
+ # Update rules with game ID.
+ kwargs[strings.GAME_ID] = game_id
+
+ # Initialize game with kwargs.
+ super(Game, self).__init__(**kwargs)
+
+ # Check settings.
+ if self.registration_password is not None and self.registration_password == '':
+ raise exceptions.DiplomacyException('Registration password must be None or non-empty string.')
+ if self.n_controls is not None and self.n_controls < 0:
+ raise exceptions.NaturalIntegerException('n_controls must be a natural integer.')
+ if self.deadline < 0:
+ raise exceptions.NaturalIntegerException('Deadline must be a natural integer.')
+ # Check rules.
+ if rules is None:
+ rules = ['SOLITAIRE', 'NO_PRESS', 'IGNORE_ERRORS', 'POWER_CHOICE']
+ # Set game rules.
+ for rule in rules:
+ self.add_rule(rule)
+ # Check settings about rule NO_DEADLINE.
+ if 'NO_DEADLINE' in self.rules:
+ self.deadline = 0
+ # Check settings about rule SOLITAIRE.
+ if 'SOLITAIRE' in self.rules:
+ self.n_controls = 0
+ elif self.n_controls == 0:
+ # If number of allowed players is 0, the game can only be solitaire.
+ self.add_rule('SOLITAIRE')
+
+ # Check timestamp_created.
+ if self.timestamp_created is None:
+ self.timestamp_created = common.timestamp_microseconds()
+
+ # Check game ID.
+ if self.game_id is None:
+ self.game_id = '%s/%s' % (self.timestamp_created, uuid.uuid4())
+
+ # Validating status
+ self._validate_status(reinit_powers=(self.timestamp_created is None))
+
+ if self.powers:
+ # Game loaded with powers.
+ # Associate loaded powers with this game.
+ for power in self.powers.values():
+ power.game = self
+ else:
+ # Begin game.
+ self._begin()
+
+ # Game loaded.
+
+ # Check map powers.
+ assert all(self.has_power(power_name) for power_name in self.map.powers)
+
+ # Check role and consistency between all power roles and game role.
+ if self.has_power(self.role):
+ # It's a power game. Each power must be a player power.
+ assert all(power.role == power.name for power in self.powers.values())
+ else:
+ # We should have a non-power game and each power must have same role as game role.
+ assert self.role in strings.ALL_ROLE_TYPES
+ assert all(power.role == self.role for power in self.powers.values())
+
+ # Wrap history fields into runtime sorted dictionaries.
+ # This is necessary to sort history fields by phase name.
+
+ self._phase_wrapper_type = common.str_cmp_class(self.map.compare_phases)
+
+ self.order_history = SortedDict(self._phase_wrapper_type, dict,
+ {self._phase_wrapper_type(key): value
+ for key, value in self.order_history.items()})
+ self.message_history = SortedDict(self._phase_wrapper_type, SortedDict,
+ {self._phase_wrapper_type(key): value
+ for key, value in self.message_history.items()})
+ self.state_history = SortedDict(self._phase_wrapper_type, dict,
+ {self._phase_wrapper_type(key): value
+ for key, value in self.state_history.items()})
+ self.result_history = SortedDict(self._phase_wrapper_type, dict,
+ {self._phase_wrapper_type(key): value
+ for key, value in self.result_history.items()})
+
+ def __str__(self):
+ """ Returns a string representation of the game instance """
+ show_map = self.map
+ show_result = self.outcome
+
+ text = ''
+ text += 'GAME %s%s%s' % (self.game_id, '\nPHASE ', self.phase)
+ text += '\nMAP %s' % self.map_name if show_map else ''
+ text += '\nRESULT %s' % ' '.join(self.outcome) if show_result else ''
+ text += '\nRULE '.join([''] + [rule for rule in self.rules if rule not in self.meta_rules])
+ text += '\nRULE !'.join([''] + [no_rule for no_rule in self.no_rules])
+ return text
+
+ def __deepcopy__(self, memo):
+ """ Fast deep copy implementation """
+ cls = self.__class__
+ result = cls.__new__(cls)
+
+ # Deep copying
+ for key in self._slots:
+ if key in ['map', 'renderer', 'powers']:
+ continue
+ setattr(result, key, deepcopy(getattr(self, key)))
+ setattr(result, 'map', self.map)
+ setattr(result, 'powers', {})
+ for power in self.powers.values():
+ result.powers[power.name] = deepcopy(power)
+ setattr(result.powers[power.name], 'game', result)
+ return result
+
+ # ====================================================================
+ # Public Interface
+ # ====================================================================
+
+ @property
+ def _slots(self):
+ """ Return an iterable of all attributes of this object.
+ Should be used in place of "self.__slots__" to be sure to retrieve all
+ attribute names from a derived class (including parent slots).
+ """
+ return (name for cls in type(self).__mro__ for name in getattr(cls, '__slots__', ()))
+
+ @property
+ def power(self):
+ """ (only for player games) Return client power associated to this game.
+ :return: a Power object.
+ :rtype: Power
+ """
+ return self.powers[self.role] if self.is_player_game() else None
+
+ @property
+ def is_game_done(self):
+ """ Returns a boolean flag that indicates if the game is done """
+ return self.phase == 'COMPLETED'
+
+ is_game_forming = property(lambda self: self.status == strings.FORMING)
+ is_game_active = property(lambda self: self.status == strings.ACTIVE)
+ is_game_paused = property(lambda self: self.status == strings.PAUSED)
+ is_game_canceled = property(lambda self: self.status == strings.CANCELED)
+ is_game_completed = property(lambda self: self.status == strings.COMPLETED)
+ current_short_phase = property(lambda self: self.map.phase_abbr(self.phase, self.phase))
+
+ civil_disorder = property(lambda self: 'CIVIL_DISORDER' in self.rules)
+ multiple_powers_per_player = property(lambda self: 'MULTIPLE_POWERS_PER_PLAYER' in self.rules)
+ no_observations = property(lambda self: 'NO_OBSERVATIONS' in self.rules)
+ no_press = property(lambda self: 'NO_PRESS' in self.rules)
+ power_choice = property(lambda self: 'POWER_CHOICE' in self.rules)
+ public_press = property(lambda self: 'PUBLIC_PRESS' in self.rules)
+ real_time = property(lambda self: 'REAL_TIME' in self.rules)
+ start_master = property(lambda self: 'START_MASTER' in self.rules)
+ solitaire = property(lambda self: 'SOLITAIRE' in self.rules)
+
+ # ==============================================================
+ # Application/network methods (mainly used for connected games).
+ # ==============================================================
+
+ def is_player_game(self):
+ """ Return True if this game is a player game. """
+ return self.has_power(self.role)
+
+ def is_observer_game(self):
+ """ Return True if this game is an observer game. """
+ return self.role == strings.OBSERVER_TYPE
+
+ def is_omniscient_game(self):
+ """ Return True if this game is an omniscient game. """
+ return self.role == strings.OMNISCIENT_TYPE
+
+ def is_server_game(self):
+ """ Return True if this game is a server game. """
+ return self.role == strings.SERVER_TYPE
+
+ def is_valid_password(self, registration_password):
+ """ Return True if given plain password matches registration password. """
+ if self.registration_password is None:
+ return registration_password is None
+ if registration_password is None:
+ return False
+ return common.is_valid_password(registration_password, self.registration_password)
+
+ def is_controlled(self, power_name):
+ """ Return True if given power name is currently controlled. """
+ return self.get_power(power_name).is_controlled()
+
+ def is_dummy(self, power_name):
+ """ Return True if given power name is not currently controlled. """
+ return not self.is_controlled(power_name)
+
+ def does_not_wait(self):
+ """ Return True if the game does not wait anything to process its current phase.
+ The game is not waiting is all **controlled** powers have defined orders and wait flag set to False.
+ If it's a solitaire game (with no controlled powers), all (dummy, not eliminated) powers must have defined
+ orders and wait flag set to False. By default, wait flag for a dummy power is True.
+ Note that an empty orders set is considered as a defined order as long as it was
+ explicitly set by the power controller.
+ """
+ if any(power.is_controlled() for power in self.powers.values()):
+ return all(power.does_not_wait() for power in self.powers.values() if power.is_controlled())
+ return all(power.is_eliminated() or power.does_not_wait() for power in self.powers.values())
+
+ def has_power(self, power_name):
+ """ Return True if this game has given power name. """
+ return power_name in self.map.powers
+
+ def has_expected_controls_count(self):
+ """ Return True if game has expected number of map powers to be controlled.
+ If True, the game can start (if not yet started).
+ """
+ return self.count_controlled_powers() == self.get_expected_controls_count()
+
+ def count_controlled_powers(self):
+ """ Return the number of controlled map powers. """
+ return sum(1 for power_name in self.get_map_power_names() if self.is_controlled(power_name))
+
+ def get_controlled_power_names(self, username):
+ """ Return the list of power names currently controlled by given user name. """
+ return [power.name for power in self.powers.values() if power.is_controlled_by(username)]
+
+ def get_expected_controls_count(self):
+ """ Return the number of map powers expected to be controlled in this game.
+ This number is either specified in settings or the number of map powers.
+ """
+ expected_count = self.n_controls
+ if expected_count is None:
+ expected_count = len(self.powers)
+ return expected_count
+
+ def get_map_power_names(self):
+ """ Return sequence of map power names. """
+ return self.powers.keys()
+
+ def get_dummy_power_names(self):
+ """ Return sequence of dummy power objects. """
+ return set(power_name for power_name in self.get_map_power_names() if self.is_dummy(power_name))
+
+ def get_controllers(self):
+ """ Return a dictionary mapping each power name to its current controller name."""
+ return {power.name: power.get_controller() for power in self.powers.values()}
+
+ def get_controllers_timestamps(self):
+ """ Return a dictionary mapping each power name to its controller timestamp. """
+ return {power.name: power.get_controller_timestamp() for power in self.powers.values()}
+
+ def get_random_power_name(self):
+ """ Return a random power name from remaining dummy power names.
+ Raise an exception if there are no dummy power names.
+ """
+ playable_power_names = list(self.get_dummy_power_names())
+ if not playable_power_names:
+ raise exceptions.RandomPowerException(1, len(playable_power_names))
+ playable_power_names.sort()
+ return playable_power_names[random.randint(0, len(playable_power_names) - 1)]
+
+ def get_latest_timestamp(self):
+ """ Return timestamp of latest data saved into this game (either current state, archived state or message).
+ :return: a timestamp
+ :rtype: int
+ """
+ timestamp = self.timestamp_created
+ if self.state_history:
+ timestamp = max(self.state_history.last_value()['timestamp'], timestamp)
+ if self.messages:
+ timestamp = max(self.messages.last_key(), timestamp)
+ return timestamp
+
+ @classmethod
+ def filter_messages(cls, messages, game_role, timestamp_from=None, timestamp_to=None):
+ """ Filter given messages based on given game role between given timestamps (bounds included).
+ See method diplomacy.utils.SortedDict.sub() about bound rules.
+ :param messages: a sorted dictionary of messages to filter.
+ :param game_role: game role requiring messages. Either a special power name
+ (PowerName.OBSERVER or PowerName.OMNISCIENT), a power name, or a list of power names.
+ :param timestamp_from: lower timestamp (included) for required messages.
+ :param timestamp_to: upper timestamp (included) for required messages.
+ :return: a dict of corresponding messages (empty if no corresponding messages found),
+ mapping messages timestamps to messages.
+ :type messages: diplomacy.utils.sorted_dict.SortedDict
+ """
+
+ # Observer can see global messages and system messages sent to observers.
+ if isinstance(game_role, str) and game_role == strings.OBSERVER_TYPE:
+ return {message.time_sent: message
+ for message in messages.sub(timestamp_from, timestamp_to)
+ if message.is_global() or message.for_observer()}
+
+ # Omniscient observer can see all messages.
+ if isinstance(game_role, str) and game_role == strings.OMNISCIENT_TYPE:
+ return {message.time_sent: message
+ for message in messages.sub(timestamp_from, timestamp_to)}
+
+ # Power can see global messages and all messages she sent or received.
+ if isinstance(game_role, str):
+ game_role = [game_role]
+ elif not isinstance(game_role, list):
+ game_role = list(game_role)
+ return {message.time_sent: message
+ for message in messages.sub(timestamp_from, timestamp_to)
+ if message.is_global() or message.recipient in game_role or message.sender in game_role}
+
+ def get_phase_history(self, from_phase=None, to_phase=None, game_role=None):
+ """ Return a list of game phase data from game history between given phases (bounds included).
+ Each GamePhaseData object contains game state, messages, orders and order results for a phase.
+ :param from_phase: either:
+ - a string: phase name
+ - an integer: index of phase in game history
+ - None (default): lowest phase stored in game history
+ :param to_phase: either:
+ - a string: phase name
+ - an integer: index of phase in game history
+ - None (default): latest phase stored in game history
+ :param game_role (optional): role of game for which phase history is retrieved.
+ If none, messages in game history will not be filtered.
+ """
+ if isinstance(from_phase, int):
+ from_phase = self.state_history.key_from_index(from_phase)
+ elif isinstance(from_phase, str):
+ from_phase = self._phase_wrapper_type(from_phase)
+ if isinstance(to_phase, int):
+ to_phase = self.state_history.key_from_index(to_phase)
+ elif isinstance(to_phase, str):
+ to_phase = self._phase_wrapper_type(to_phase)
+ phases = self.state_history.sub_keys(from_phase, to_phase)
+ states = self.state_history.sub(from_phase, to_phase)
+ orders = self.order_history.sub(from_phase, to_phase)
+ messages = self.message_history.sub(from_phase, to_phase)
+ results = self.result_history.sub(from_phase, to_phase)
+ if game_role:
+ messages = [self.filter_messages(msg_dict, game_role) for msg_dict in messages]
+ assert len(phases) == len(states) == len(orders) == len(messages) == len(results), (
+ len(phases), len(states), len(orders), len(messages), len(results))
+ return [GamePhaseData(name=str(phases[i]),
+ state=states[i],
+ orders=orders[i],
+ messages=messages[i],
+ results=results[i])
+ for i in range(len(phases))]
+
+ def get_phase_from_history(self, short_phase_name, game_role=None):
+ """ Return a game phase data corresponding to given phase from phase history. """
+ return self.get_phase_history(short_phase_name, short_phase_name, game_role)[0]
+
+ def phase_history_from_timestamp(self, timestamp):
+ """ Return list of game phase data from game history for which state timestamp >= given timestamp. """
+ earliest_phase = ''
+ for state in self.state_history.reversed_values():
+ if state['timestamp'] < timestamp:
+ break
+ earliest_phase = state['name']
+ return self.get_phase_history(from_phase=earliest_phase) if earliest_phase else []
+
+ def extend_phase_history(self, game_phase_data):
+ """ Add data from a game phase to game history.
+ :param game_phase_data: a GamePhaseData object.
+ :type game_phase_data: GamePhaseData
+ """
+ phase = self._phase_wrapper_type(game_phase_data.name)
+ assert phase not in self.state_history
+ assert phase not in self.message_history
+ assert phase not in self.order_history
+ assert phase not in self.result_history
+ self.state_history.put(phase, game_phase_data.state)
+ self.message_history.put(phase, game_phase_data.messages)
+ self.order_history.put(phase, game_phase_data.orders)
+ self.result_history.put(phase, game_phase_data.results)
+
+ def set_status(self, status):
+ """ Set game status with given status (should be in diplomacy.utils.strings.ALL_GAME_STATUSES). """
+ assert status in strings.ALL_GAME_STATUSES
+ self.status = status
+
+ def draw(self, winners=None):
+ """ Force a draw for this game, set status as COMPLETED and finish the game.
+ :param winners: (optional) either None (all powers remaining to map are considered winners) or a sequence
+ of required power names to be considered as winners.
+ :return: a couple (previous state, current state) with game state before the draw and game state after
+ the draw.
+ """
+ if winners is None:
+ # Draw with all powers which still have units in map.
+ winners = [power.name for power in self.powers.values() if power.units]
+
+ # No orders will be processed when drawing, so clear current orders.
+ self.clear_orders()
+
+ # Collect data about current phase before drawing.
+ previous_phase = self._phase_wrapper_type(self.current_short_phase)
+ previous_orders = self.get_orders()
+ previous_messages = self.messages.copy()
+ previous_state = self.get_state()
+
+ # Finish the game.
+ self._finish(winners)
+
+ # Then clear game and save previous phase.
+ self.clear_vote()
+ self.clear_orders()
+ self.messages.clear()
+ self.order_history.put(previous_phase, previous_orders)
+ self.message_history.put(previous_phase, previous_messages)
+ self.state_history.put(previous_phase, previous_state)
+ self.result_history.put(previous_phase, {})
+
+ # There are no expected results for orders, as there are no orders processed.
+
+ previous_phase_data = GamePhaseData(name=str(previous_phase),
+ state=previous_state,
+ orders=previous_orders,
+ messages=previous_messages,
+ results={})
+ current_phase_data = GamePhaseData(name=self.current_short_phase,
+ state=self.get_state(),
+ orders={},
+ messages={},
+ results={})
+
+ return previous_phase_data, current_phase_data
+
+ def set_controlled(self, power_name, username):
+ """ Control power with given username (may be None to set dummy power).
+ See method diplomacy.Power#set_controlled.
+ """
+ self.get_power(power_name).set_controlled(username)
+
+ def update_dummy_powers(self, dummy_power_names):
+ """ Force all power associated to given dummy power names to be uncontrolled.
+ :param dummy_power_names: Sequence of required dummy power names.
+ """
+ for dummy_power_name in dummy_power_names:
+ if self.has_power(dummy_power_name):
+ self.set_controlled(dummy_power_name, None)
+
+ def update_powers_controllers(self, powers_controllers, timestamps):
+ """ Update powers controllers.
+ :param powers_controllers: a dictionary mapping a power name to a controller name.
+ :param timestamps: a dictionary mapping a power name to timestamp when related controller
+ (in powers_controllers) was associated to power.
+ :type powers_controllers: dict
+ """
+ for power_name, controller in powers_controllers.items():
+ self.get_power(power_name).update_controller(controller, timestamps[power_name])
+
+ def new_power_message(self, recipient, body):
+ """ Create a undated (without timestamp) power message to be sent from a power to another via server.
+ Server will answer with timestamp, and message will be updated
+ and added to local game messages.
+ :param recipient: recipient power name (string).
+ :param body: message body (string).
+ :return: a new GameMessage object.
+ :rtype: GameMessage
+ """
+ assert self.is_player_game()
+ if not self.has_power(recipient):
+ raise exceptions.MapPowerException(recipient)
+ return Message(phase=self.current_short_phase, sender=self.role, recipient=recipient, message=body)
+
+ def new_global_message(self, body):
+ """ Create an undated (without timestamp) global message to be sent from a power via server.
+ Server will answer with timestamp, and message will be updated and added to local game messages.
+ :param body: message body (string).
+ :return: a new GameMessage object.
+ :rtype: Message
+ """
+ assert self.is_player_game()
+ return Message(phase=self.current_short_phase, sender=self.role, recipient=GLOBAL, message=body)
+
+ def add_message(self, message):
+ """ Add message to current game data.
+ Only a server game can add a message with no timestamp:
+ game will auto-generate a timestamp for the message.
+ :param message: a GameMessage object to add.
+ :return: message timestamp.
+ :rtype: int
+ """
+ assert isinstance(message, Message)
+ if self.is_player_game():
+ assert message.is_global() or self.power.name in (message.sender, message.recipient)
+
+ if message.time_sent is None:
+ # This instance must be a server game.
+ # Message should be a new message matching current game phase.
+ # There should not be any more recent message in message history (as we are adding a new message).
+ # We must generate a timestamp for this message.
+ assert self.is_server_game()
+ if message.phase != self.current_short_phase:
+ raise exceptions.GamePhaseException(self.current_short_phase, message.phase)
+ assert not self.messages or common.timestamp_microseconds() >= self.messages.last_key()
+ time.sleep(1e-6)
+ message.time_sent = common.timestamp_microseconds()
+
+ self.messages.put(message.time_sent, message)
+ return message.time_sent
+
+ # Vote methods. For server and omniscient games only.
+ # Observer game should not see votes.
+ # Power game should know only vote of related power (votes for all other power should be 'neutral' in a power game).
+
+ def has_draw_vote(self):
+ """ Return True if all controlled non-eliminated powers have voted YES to draw game at current phase. """
+ assert self.is_server_game() or self.is_omniscient_game()
+ return all(
+ power.vote == strings.YES
+ for power in self.powers.values()
+ if not power.is_eliminated()
+ )
+
+ def count_voted(self):
+ """ Return the count of controlled powers who already voted for a draw for current phase. """
+ assert self.is_server_game() or self.is_omniscient_game()
+ return sum(1 for power in self.powers.values()
+ if not power.is_eliminated() and power.vote != strings.NEUTRAL)
+
+ def clear_vote(self):
+ """ Clear current vote. """
+ for power in self.powers.values():
+ power.vote = strings.NEUTRAL
+
+ # ==============
+ # Basic methods.
+ # ==============
+ def get_units(self, power_name=None):
+ """ Retrieves the list of units for a power or for all powers
+ :param power_name: Optional. The name of the power (e.g. 'FRANCE') or None for all powers
+ :return: A list of units (e.g. ['A PAR', 'A MAR']) if a power name is provided
+ or a dictionary of powers with their units if None is provided (e.g. {'FRANCE': [...], ...}
+
+ Note: Dislodged units will appear with a leading asterisk (e.g. '*A PAR')
+ """
+ if power_name is not None:
+ power_name = power_name.upper()
+ power = self.get_power(power_name)
+ if power_name is not None:
+ return power.units[:] + ['*{}'.format(unit) for unit in power.retreats]
+ if power_name is None:
+ units = {}
+ for power in self.powers.values():
+ units[power.name] = self.get_units(power.name)
+ return units
+ return []
+
+ def get_centers(self, power_name=None):
+ """ Retrieves the list of owned supply centers for a power or for all powers
+ :param power_name: Optional. The name of the power (e.g. 'FRANCE') or None for all powers
+ :return: A list of supply centers (e.g. ['PAR', 'MAR']) if a power name is provided
+ or a dictionary of powers with their supply centers if None is provided
+ (e.g. {'FRANCE': [...], ...}
+ """
+ if power_name is not None:
+ power_name = power_name.upper()
+ power = self.get_power(power_name)
+ if power_name is not None:
+ return power.centers[:]
+ if power_name is None:
+ centers = {}
+ for power in self.powers.values():
+ centers[power.name] = self.get_centers(power.name)
+ return centers
+ return []
+
+ def get_orders(self, power_name=None):
+ """ Retrieves the orders submitted by a specific power, or by all powers
+ :param power_name: Optional. The name of the power (e.g. 'FRANCE') or None for all powers
+ :return: A list of orders (e.g. ['A PAR H', 'A MAR - BUR']) if a power name is provided
+ or a dictionary of powers with their orders if None is provided
+ (e.g. {'FRANCE': ['A PAR H', 'A MAR - BUR', ...], ...}
+ """
+ if power_name is not None:
+ power_name = power_name.upper()
+ power = self.get_power(power_name)
+
+ # Getting orders for a particular power
+ # Skipping VOID and WAIVE orders in Adjustment/Retreats phase
+ if power_name is not None:
+ if self.get_current_phase()[-1] == 'M':
+ if 'NO_CHECK' in self.rules:
+ power_orders = [power.orders[order] for order in power.orders if power.orders[order]]
+ else:
+ power_orders = ['{} {}'.format(unit, unit_order) for unit, unit_order in power.orders.items()]
+ else:
+ power_orders = [order for order in power.adjust
+ if order and order != 'WAIVE' and order.split()[0] != 'VOID']
+ return power_orders
+
+ # Recursively calling itself to get all powers
+ elif power_name is None:
+ orders = {}
+ for power in self.powers.values():
+ orders[power.name] = self.get_orders(power.name)
+ return orders
+ return []
+
+ def get_orderable_locations(self, power_name=None):
+ """ Find the location requiring an order for a power (or for all powers)
+ :param power_name: Optionally, the name of the power (e.g. 'FRANCE') or None for all powers
+ :return: A list of orderable locations (e.g. ['PAR', 'MAR']) if a power name is provided
+ or a dictionary of powers with their orderable locations if None is not provided
+ (e.g. {'FRANCE': [...], ...}
+ """
+ if power_name is not None:
+ power_name = power_name.upper()
+ power = self.get_power(power_name)
+
+ # Single power
+ if power_name is not None:
+ current_phase_type = self.get_current_phase()[-1]
+
+ # Adjustment
+ if current_phase_type == 'A':
+ build_count = len(power.centers) - len(power.units)
+
+ # Building - All unoccupied homes
+ if build_count > 0:
+ orderable_locs = self._build_sites(power)
+
+ # Nothing can be built.
+ elif build_count == 0:
+ orderable_locs = []
+
+ # Disbanding - All units location
+ else:
+ orderable_locs = [unit[2:5] for unit in power.units]
+
+ # Retreating
+ elif current_phase_type == 'R':
+ orderable_locs = [unit[2:5] for unit in power.retreats]
+
+ # Movement
+ else:
+ orderable_locs = [unit[2:5] for unit in power.units]
+
+ # Returning and sorting for deterministic output
+ return sorted(orderable_locs)
+
+ # All powers
+ else:
+ return {power.name: self.get_orderable_locations(power.name) for power in self.powers.values()}
+
+ def get_order_status(self, power_name=None, unit=None):
+ """ Returns a list or a dict representing the order status ('', 'no convoy', 'bounce', 'void', 'cut',
+ 'dislodged', 'disrupted') for orders submitted in the last phase
+ :param power_name: Optional. If provided (e.g. 'FRANCE') will only return the order status of that
+ power's orders
+ :param unit: Optional. If provided (e.g. 'A PAR') will only return that specific unit order status.
+ :param phase_type: Optional. Returns the results of a specific phase type (e.g. 'M', 'R', or 'A')
+ :return: If unit is provided a list (e.g. [] or ['void', 'dislodged'])
+ If power is provided a dict (e.g. {'A PAR': ['void'], 'A MAR': []})
+ Otherwise a 2-level dict (e.g. {'FRANCE: {'A PAR': ['void'], 'A MAR': []}, 'ENGLAND': {}, ... }
+ """
+ # Specific location, returning string
+ if unit is not None:
+ result_dict = self.result_history.last_value() if self.result_history else {}
+ return result_dict[unit][:] if unit in result_dict else []
+
+ # Specific power, returning dictionary
+ if power_name is not None:
+ power_name = power_name.upper()
+ if power_name is not None:
+ order_status = {}
+ if self.state_history:
+ state_history = self.state_history.last_value()
+ for ordered_unit in state_history['units'][power_name]:
+ ordered_unit = ordered_unit.replace('*', '')
+ order_status[ordered_unit] = self.get_order_status(power_name, ordered_unit)
+ return order_status
+
+ # All powers
+ if power_name is None:
+ order_status = {}
+ for power in self.powers.values():
+ order_status[power.name] = self.get_order_status(power.name)
+ return order_status
+ return {}
+
+ def get_power(self, power_name):
+ """ Retrieves a power instance from given power name.
+ :param power_name: name of power instance to retrieve. Power name must be as given
+ in map file.
+ :return: the power instance, or None if power name is not found.
+ :rtype: Power
+ """
+ return self.powers.get(power_name, None)
+
+ def set_units(self, power_name, units, reset=False):
+ """ Sets units directly on the map
+ :param power_name: The name of the power who will own the units (e.g. 'FRANCE')
+ :param units: An unit (e.g. 'A PAR') or a list of units (e.g. ['A PAR', 'A MAR']) to set
+ Note units starting with a '*' will be set as dislodged
+ :param reset: Boolean. If, clear all units of the power before setting them
+ :return: Nothing
+ """
+ power_name = power_name.upper()
+ if not isinstance(units, list):
+ units = [units]
+ if power_name not in self.powers:
+ return
+
+ # Clearing old units if reset is true
+ if reset and power_name in self.powers:
+ self.powers[power_name].clear_units()
+
+ regular_units = [unit for unit in units if unit[0] != '*']
+ dislodged_units = [unit[1:] for unit in units if unit[0] == '*']
+ influence = [unit[2:5] for unit in regular_units + dislodged_units]
+
+ # Removing units that are already there
+ for power in self.powers.values():
+ for unit in regular_units:
+ unit_loc = unit[2:5]
+ for unit_to_remove in [p_unit for p_unit in power.units if p_unit[2:5] == unit_loc]:
+ self.update_hash(power.name, unit_type=unit_to_remove[0], loc=unit_to_remove[2:])
+ power.units.remove(unit_to_remove)
+ for unit in dislodged_units:
+ unit_loc = unit[2:5]
+ for unit_to_remove in [p_unit for p_unit in power.retreats if p_unit[2:5] == unit_loc]:
+ self.update_hash(power.name, unit_type=unit_to_remove[0], loc=unit_to_remove[2:], is_dislodged=True)
+ del power.retreats[unit_to_remove]
+ for loc in influence:
+ if loc in power.influence:
+ power.influence.remove(loc)
+
+ # Retrieving the target power
+ power = self.get_power(power_name)
+
+ # Re-adding normal units to the new power
+ for unit in regular_units:
+ word = unit.upper().split()
+ if len(word) != 2:
+ continue
+ unit_type, unit_loc = word
+ if unit_type in ('A', 'F') \
+ and unit_loc in [loc.upper() for loc in self.map.locs] \
+ and self.map.is_valid_unit(unit):
+ if power:
+ self.update_hash(power_name, unit_type=unit_type, loc=unit_loc)
+ power.units.append(unit)
+ power.influence.append(unit[2:5])
+ else:
+ self.error += [err.MAP_INVALID_UNIT % unit]
+
+ # Re-adding dislodged units to the new power
+ for unit in dislodged_units:
+ word = unit.upper().split()
+ if len(word) != 2:
+ continue
+ unit_type, unit_loc = word
+ if unit_type in ('A', 'F') and unit_loc in [loc.upper() for loc in self.map.locs]:
+ abuts = [abut.upper() for abut in self.map.abut_list(unit_loc, incl_no_coast=True)
+ if self._abuts(unit_type, unit_loc, '-', abut.upper())]
+ if power:
+ self.update_hash(power_name, unit_type=unit_type, loc=unit_loc, is_dislodged=True)
+ power.retreats[unit] = abuts
+
+ # Clearing cache
+ self.clear_cache()
+
+ def set_centers(self, power_name, centers, reset=False):
+ """ Transfers supply centers ownership
+ :param power_name: The name of the power who will control the supply centers (e.g. 'FRANCE')
+ :param centers: A loc (e.g. 'PAR') or a list of locations (e.g. ['PAR', 'MAR']) to transfer
+ :param reset: Boolean. If, removes ownership of all power's SC before transferring ownership of the new SC
+ :return: Nothing
+ """
+ power_name = power_name.upper()
+ if not isinstance(centers, list):
+ centers = [centers]
+ if power_name not in self.powers:
+ return
+
+ # Clearing old centers if reset is true
+ if reset and power_name in self.powers:
+ self.powers[power_name].clear_centers()
+
+ # Removing centers that are already controlled by another power
+ for power in self.powers.values():
+ for center in centers:
+ if center in power.centers:
+ self.update_hash(power.name, loc=center, is_center=True)
+ power.centers.remove(center)
+
+ # Transferring center to power_name
+ power = self.get_power(power_name)
+ if power:
+ for center in centers:
+ if center in self.map.scs:
+ self.update_hash(power_name, loc=center, is_center=True)
+ power.centers += [center]
+
+ # Clearing cache
+ self.clear_cache()
+
+ def set_orders(self, power_name, orders, expand=True, replace=True):
+ """ Sets the current orders for a power
+ :param power_name: The name of the power (e.g. 'FRANCE')
+ :param orders: The list of orders (e.g. ['A MAR - PAR', 'A PAR - BER', ...])
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ power_name = power_name.upper()
+
+ if not self.has_power(power_name):
+ return
+
+ if self.is_player_game() and self.role != power_name:
+ return
+
+ power = self.get_power(power_name)
+ if power:
+ if not isinstance(orders, list):
+ orders = [orders]
+
+ # Remove any empty string from orders.
+ orders = [order for order in orders if order]
+
+ # Setting orders depending on phase type
+ if self.phase_type == 'R':
+ self._update_retreat_orders(power, orders, expand=expand, replace=replace)
+ elif self.phase_type == 'A':
+ self._update_adjust_orders(power, orders, expand=expand, replace=replace)
+ else:
+ self._update_orders(power, orders, expand=expand, replace=replace)
+ power.order_is_set = (OrderSettings.ORDER_SET
+ if self.get_orders(power.name)
+ else OrderSettings.ORDER_SET_EMPTY)
+
+ def set_wait(self, power_name, wait):
+ """ Set wait flag for a power.
+ :param power_name: name of power to set wait flag.
+ :param wait: wait flag (boolean).
+ """
+ power_name = power_name.upper()
+
+ if not self.has_power(power_name):
+ return
+
+ power = self.get_power(power_name.upper()) # type: Power
+ power.wait = wait
+
+ def clear_units(self, power_name=None):
+ """ Clear the power's units
+ :param power_name: Optional. The name of the power whose units will be cleared (e.g. 'FRANCE'),
+ otherwise all units on the map will be cleared
+ :return: Nothing
+ """
+ for power in self.powers.values():
+ if power_name is None or power.name == power_name:
+ power.clear_units()
+ self.clear_cache()
+
+ def clear_centers(self, power_name=None):
+ """ Removes ownership of supply centers
+ :param power_name: Optional. The name of the power whose centers will be cleared (e.g. 'FRANCE'),
+ otherwise all centers on the map will lose ownership.
+ :return: Nothing
+ """
+ for power in self.powers.values():
+ if power_name is None or power.name == power_name:
+ power.clear_centers()
+ self.clear_cache()
+
+ def clear_orders(self, power_name=None):
+ """ Clears the power's orders
+ :param power_name: Optional. The name of the power to clear (e.g. 'FRANCE') or will clear orders for
+ all powers if None.
+ :return: Nothing
+ """
+ if power_name is not None:
+ power = self.get_power(power_name.upper())
+ power.clear_orders()
+ else:
+ for power in self.powers.values():
+ power.clear_orders()
+
+ def clear_cache(self):
+ """ Clears all caches """
+ self.convoy_paths_possible, self.convoy_paths_dest = None, None
+
+ def get_current_phase(self):
+ """ Returns the current phase (format 'S1901M' or 'FORMING' or 'COMPLETED' """
+ return self._phase_abbr()
+
+ def set_current_phase(self, new_phase):
+ """ Changes the phase to the specified new phase (e.g. 'S1901M') """
+ if new_phase in ('FORMING', 'COMPLETED'):
+ self.phase = new_phase
+ self.phase_type = None
+ else:
+ self.phase = self.map.phase_long(new_phase)
+ self.phase_type = self.phase.split()[-1][0]
+
+ def render(self, incl_orders=True, incl_abbrev=False, output_format='svg'):
+ """ Renders the current game and returns its image representation
+ :param incl_orders: Optional. Flag to indicate we also want to render orders.
+ :param incl_abbrev: Optional. Flag to indicate we also want to display the provinces abbreviations.
+ :param output_format: The desired output format.
+ :return: The rendered image in the specified format.
+ """
+ if not self.renderer:
+ self.renderer = Renderer(self)
+ return self.renderer.render(incl_orders=incl_orders, incl_abbrev=incl_abbrev, output_format=output_format)
+
+ def add_rule(self, rule):
+ """ Adds a rule to the current rule list
+ :param rule: Name of rule to add (e.g. 'NO_PRESS')
+ :return: Nothing
+ """
+ if not self.__class__.rule_cache:
+ self._load_rules()
+ valid_rules = {valid_rule for valid_rule in self.__class__.rule_cache[0]}
+
+ if rule not in valid_rules or rule in self.no_rules:
+ return
+
+ forbidden_rules = self.__class__.rule_cache[0].get(rule, {}).get('!', [])
+ rules_to_add = self.__class__.rule_cache[0].get(rule, {}).get('+', [])
+ rules_to_remove = self.__class__.rule_cache[0].get(rule, {}).get('-', [])
+
+ # Making sure we don't already have a forbidden rule
+ for forbidden in forbidden_rules:
+ if forbidden in self.rules:
+ self.error += [err.GAME_FORBIDDEN_RULE % (forbidden, rule)]
+ return
+ if forbidden not in self.no_rules:
+ self.no_rules.add(forbidden)
+
+ # Adding rules
+ for rule_to_add in rules_to_add:
+ if rule_to_add not in self.rules:
+ self.rules.append(rule_to_add)
+
+ # Removing rules
+ for rule_to_remove in rules_to_remove:
+ if rule_to_remove in self.rules:
+ self.rules.remove(rule_to_remove)
+
+ # Adding main rule
+ if rule not in self.rules:
+ self.rules.append(rule)
+
+ def remove_rule(self, rule):
+ """ Removes a rule from the current rule list
+ :param rule: Name of rule to remove (e.g. 'NO_PRESS')
+ :return: Nothing
+ """
+ if rule in self.rules:
+ self.rules.remove(rule)
+
+ def load_map(self, reinit_powers=True):
+ """ Load a map and process directives
+ :param reinit_powers: Boolean. If true, empty powers dict.
+ :return: Nothing, but stores the map in self.map
+ """
+ # Create a map, and check for errors
+ self.map = Map(self.map_name)
+ if self.map_name != self.map.name:
+ raise RuntimeError('Invalid Map loaded. Expected %s - Got %s' % (self.map_name, self.map.name))
+
+ # Adding map rules
+ for rule in self.map.rules:
+ self.add_rule(rule)
+
+ # Build Zobrist tables
+ self._build_hash_table()
+
+ self.error += self.map.error
+
+ # Sets the current phase to the long version
+ if self.phase and ' ' not in self.phase and self.phase not in ('FORMING', 'COMPLETED'):
+ self.phase = self.map.phase_long(self.phase)
+
+ # Have the Game process all lines in the map file that were in DIRECTIVES clauses (this includes any RULE lines)
+ # Do this for all directives given without a variant and for those specific for this Game's variant.
+ if self.phase == 'FORMING':
+ return
+
+ # Resetting powers
+ if reinit_powers:
+ self.powers = {}
+
+ def process(self):
+ """ Processes the current phase of the game.
+ :return: game phase data with data before processing.
+ """
+ previous_phase = self._phase_wrapper_type(self.current_short_phase)
+ previous_orders = self.get_orders()
+ previous_messages = self.messages.copy()
+ previous_state = self.get_state()
+
+ if self.error:
+ if 'IGNORE_ERRORS' not in self.rules:
+ print('The following errors were encountered and were cleared before processing.')
+ for error in self.error:
+ print('-- %s' % error)
+ print('-' * 32)
+ self.error = []
+ self._process()
+
+ # result_history should have been updated with orders results for processed (previous) phase.
+
+ self.clear_vote()
+ self.clear_orders()
+ self.messages.clear()
+ self.order_history.put(previous_phase, previous_orders)
+ self.message_history.put(previous_phase, previous_messages)
+ self.state_history.put(previous_phase, previous_state)
+ return GamePhaseData(name=str(previous_phase),
+ state=previous_state,
+ orders=previous_orders,
+ messages=previous_messages,
+ results=self.result_history[previous_phase])
+
+ def rebuild_hash(self):
+ """ Completely recalculate the Zobrist hash
+ :return: The updated hash value
+ """
+ self.zobrist_hash = 0
+ if self.map is None:
+ return 0
+
+ # Recalculating for each power
+ for power in self.powers.values():
+ for unit in power.units:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ for dis_unit in power.retreats:
+ self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ for center in power.centers:
+ self.update_hash(power.name, loc=center, is_center=True)
+ for home in power.homes:
+ self.update_hash(power.name, loc=home, is_home=True)
+
+ # Clearing cache
+ self.clear_cache()
+
+ # Returning the new hash
+ return self.get_hash()
+
+ def get_hash(self):
+ """ Returns the zobrist hash for the current game """
+ # Needs to be a string, otherwise json.dumps overflows
+ return str(self.zobrist_hash)
+
+ def update_hash(self, power, unit_type='', loc='', is_dislodged=False, is_center=False, is_home=False):
+ """ Updates the zobrist hash for the current game
+ :param power: The name of the power owning the unit, supply center or home
+ :param unit_type: Contains the unit type of the unit being added or remove from the board ('A' or 'F')
+ :param loc: Contains the location of the unit, supply center, of home being added or remove
+ :param is_dislodged: Indicates that the unit being added/removed is dislodged
+ :param is_center: Indicates that the location being added/removed is a supply center
+ :param is_home: Indicates that the location being added/removed is a home
+ :return: Nothing
+ """
+ if self.map is None:
+ return
+ zobrist = self.__class__.zobrist_tables[self.map_name]
+ loc = loc[:3].upper() if is_center or is_home else loc.upper()
+ power = power.upper()
+
+ power_ix = zobrist['map_powers'].index(power)
+ loc_ix = zobrist['map_locs'].index(loc)
+ unit_type_ix = ['A', 'F'].index(unit_type) if unit_type in ['A', 'F'] else -1
+
+ # Dislodged
+ if is_dislodged:
+ self.zobrist_hash ^= zobrist['dis_unit_type'][unit_type_ix, loc_ix]
+ self.zobrist_hash ^= zobrist['dis_units'][power_ix, loc_ix]
+
+ # Supply Center
+ elif is_center:
+ self.zobrist_hash ^= zobrist['centers'][power_ix, loc_ix]
+
+ # Home
+ elif is_home:
+ self.zobrist_hash ^= zobrist['homes'][power_ix, loc_ix]
+
+ # Regular unit
+ else:
+ self.zobrist_hash ^= zobrist['unit_type'][unit_type_ix, loc_ix]
+ self.zobrist_hash ^= zobrist['units'][power_ix, loc_ix]
+
+ def get_phase_data(self):
+ """ Return a GamePhaseData object representing current game. """
+ # Associate each power name to power orders, or None if order ist not set for the power.
+ # This is done to make distinction between voluntary empty orders ([]) and unset orders (None).
+ current_orders = {power.name: (self.get_orders(power.name) if power.order_is_set else None)
+ for power in self.powers.values()}
+ # Game does not have results for current orders (until orders are processed and game phase is updated).
+ return GamePhaseData(name=self.current_short_phase,
+ state=self.get_state(),
+ orders=current_orders,
+ messages=self.messages.copy(),
+ results={})
+
+ def set_phase_data(self, phase_data, clear_history=True):
+ """ Set game from phase data.
+ :param phase_data: either a GamePhaseData or a list of GamePhaseData.
+ If phase_data is a GamePhaseData, it will be treated as a list of GamePhaseData with 1 element.
+ Last phase data in given list will be used to set current game internal state.
+ Previous phase data in given list will replace current game history.
+ :param clear_history: Indicate if we must clear game history fields before update.
+ """
+ if not phase_data:
+ return
+ if isinstance(phase_data, GamePhaseData):
+ phase_data = [phase_data]
+ elif not isinstance(phase_data, list):
+ phase_data = list(phase_data)
+
+ if clear_history:
+ self._clear_history()
+
+ for game_phase_data in phase_data[:-1]: # type: GamePhaseData
+ self.extend_phase_history(game_phase_data)
+
+ current_phase_data = phase_data[-1] # type: GamePhaseData
+ self.set_state(current_phase_data.state, clear_history=False)
+ for power_name, power_orders in current_phase_data.orders.items():
+ if power_orders is not None:
+ self.set_orders(power_name, power_orders)
+ self.messages = current_phase_data.messages.copy()
+ # We ignore 'results' for current phase data.
+
+ def get_state(self):
+ """ Gets the internal saved state of the game.
+ This state is intended to represent current game view
+ (powers states, orders results for previous phase, and few more info).
+ See field message_history to get messages from previous phases.
+ See field order_history to get orders from previous phases.
+ To get a complete state of all data in this game object, consider using method Game.to_dict().
+
+ :param make_copy: Boolean. If true, a deep copy of the game state is returned, otherwise the attributes are
+ returned directly.
+ :return: The internal saved state (dict) of the game
+ """
+ state = {}
+ state['timestamp'] = common.timestamp_microseconds()
+ state['zobrist_hash'] = self.get_hash()
+ state['note'] = self.note
+ state['name'] = self._phase_abbr()
+ state['units'] = {}
+ state['centers'] = {}
+ state['homes'] = {}
+ state['influence'] = {}
+ state['civil_disorder'] = {}
+ state['builds'] = {}
+
+ # Setting powers data: units, centers, homes, influence and civil disorder.
+ for power in self.powers.values():
+ state['units'][power.name] = list(power.units) + ['*{}'.format(d) for d in power.retreats]
+ state['centers'][power.name] = list(power.centers)
+ state['homes'][power.name] = list(power.homes)
+ state['influence'][power.name] = list(power.influence)
+ state['civil_disorder'][power.name] = power.civil_disorder
+ # Setting build
+ state['builds'][power.name] = {}
+ if self.phase_type != 'A':
+ state['builds'][power.name]['count'] = 0
+ else:
+ state['builds'][power.name]['count'] = len(power.centers) - len(power.units)
+ state['builds'][power.name]['homes'] = []
+ if state['builds'][power.name].get('count', 0) > 0:
+ build_sites = self._build_sites(power)
+ state['builds'][power.name]['count'] = min(len(build_sites), state['builds'][power.name]['count'])
+ state['builds'][power.name]['homes'] = build_sites
+
+ # Returning state
+ return state
+
+ def set_state(self, state, clear_history=True):
+ """ Sets the game from a saved internal state
+ :param state: The saved state (dict)
+ :param clear_history: Boolean. If true, all game histories are cleared.
+ :return: Nothing
+ """
+ if clear_history:
+ self._clear_history()
+
+ if 'map' in state and self.map.name != state['map']:
+ raise RuntimeError('Inconsistent state map (state: %s, game: %s)' % (state['map'], self.map.name))
+ if 'rules' in state:
+ self.rules = []
+ for rule in state['rules']:
+ self.add_rule(rule)
+
+ if 'note' in state:
+ self.note = state['note']
+ if 'name' in state and state['name']:
+ self.set_current_phase(state['name'])
+ if 'units' in state:
+ for power_name, units in state['units'].items():
+ self.set_units(power_name, units, reset=True)
+ if 'centers' in state:
+ for power_name, centers in state['centers'].items():
+ self.set_centers(power_name, centers, reset=True)
+ for power in self.powers.values():
+ if 'homes' in state and power.name in state['homes']:
+ power.homes = list(state['homes'][power.name])
+ else:
+ power.homes = list(self.map.homes[power.name])
+ if 'influence' in state:
+ for power_name, influence in state['influence'].items():
+ power = self.get_power(power_name)
+ power.influence = deepcopy(influence)
+ if 'civil_disorder' in state:
+ for power_name, civil_disorder in state['civil_disorder'].items():
+ power = self.get_power(power_name)
+ power.civil_disorder = civil_disorder
+
+ # Rebuilding hash and returning
+ self.rebuild_hash()
+ self._build_list_possible_convoys()
+
+ def get_all_possible_orders(self, loc=None):
+ """ Computes a list of all possible orders for a unit in a given location
+ :param loc: Optional. The location where to get a list of orders (must include coasts)
+ If not provided, returns a list of all possible orders for all locations
+ :return: A list of orders for the unit, if there is a unit at location, or a list of possible
+ orders for all locations if no locations are provided.
+ """
+ # pylint: disable=too-many-branches
+ # No locations, building a dict with all locations
+ if not loc:
+ all_possible_orders = {}
+ for map_loc in self.map.locs:
+ map_loc = map_loc.upper()
+ all_possible_orders[map_loc] = self.get_all_possible_orders(map_loc)
+ return all_possible_orders
+
+ def remove_duplicates(list_with_dup):
+ """ Shorthand functions to remove duplicates """
+ seen = set()
+ return [item for item in list_with_dup if not (item in seen or seen.add(item))]
+
+ # Otherwise finding the possible orders at that specific location
+ possible_orders = []
+ is_dislodged = False
+ unit = None
+ unit_power = None
+
+ # If there are coasts possible, recursively adding the coasts first, then adding the loc orders
+ if '/' not in loc:
+ for loc_with_coast in [coast for coast in self.map.find_coasts(loc) if '/' in coast]:
+ possible_orders += self.get_all_possible_orders(loc_with_coast)
+
+ # Determining if there is a unit at loc
+ # Dislodged unit have precedence over regular unit in Retreat phase
+ for power in self.powers.values():
+ dislodged = [u for u in power.retreats if u[2:] == loc.upper()]
+ regular = [u for u in power.units if u[2:] == loc.upper()]
+ if dislodged:
+ is_dislodged = True
+ unit = dislodged[0]
+ unit_power = power
+ break
+ elif regular and not is_dislodged:
+ unit = regular[0]
+ unit_power = power
+ if self.phase_type != 'R':
+ break
+
+ # No unit found, checking if location is a home
+ if unit is None:
+ if self.phase_type != 'A':
+ return remove_duplicates(possible_orders)
+ for power in self.powers.values():
+ if loc[:3] in power.homes and 'BUILD_ANY' not in self.rules:
+ unit_power = power
+ break
+ if loc[:3] in power.centers and 'BUILD_ANY' in self.rules:
+ unit_power = power
+ break
+
+ # Not a home, and no units
+ if not unit_power:
+ return remove_duplicates(possible_orders)
+
+ # Determining if we can build or need to remove units
+ build_count = 0 if self.phase_type != 'A' else len(unit_power.centers) - len(unit_power.units)
+
+ # Determining unit type and unit location
+ unit_type = unit[0] if unit else ''
+ unit_loc = unit[2:] if unit else ''
+
+ # Movement phase
+ if self.phase_type == 'M':
+ # Computing coasts for dest
+ dest_1_hops = [l.upper() for l in self.map.abut_list(unit_loc, incl_no_coast=True)]
+ dest_with_coasts = [self.map.find_coasts(dest) for dest in dest_1_hops]
+ dest_with_coasts = {val for sublist in dest_with_coasts for val in sublist}
+
+ # Hold
+ possible_orders += ['{} H'.format(unit)]
+
+ # Move (Regular) and Support (Hold)
+ for dest in dest_with_coasts:
+ if self._abuts(unit_type, unit_loc, '-', dest):
+ possible_orders += ['{} - {}'.format(unit, dest)]
+ if self._abuts(unit_type, unit_loc, 'S', dest):
+ if self._unit_owner('A {}'.format(dest)):
+ possible_orders += ['{} S A {}'.format(unit, dest)]
+ elif self._unit_owner('F {}'.format(dest)):
+ possible_orders += ['{} S F {}'.format(unit, dest)]
+
+ # Move Via Convoy
+ for dest in self._get_convoy_destinations(unit_type, unit_loc):
+ possible_orders += ['{} - {} VIA'.format(unit, dest)]
+
+ # Support (Move)
+ for dest in dest_with_coasts:
+
+ # Computing src of move (both from adjacent provinces and possible convoys)
+ # We can't support a unit that needs us to convoy it to its destination
+ abut_srcs = self.map.abut_list(dest, incl_no_coast=True)
+ convoy_srcs = self._get_convoy_destinations('A', dest, exclude_convoy_locs=[unit_loc])
+
+ # Computing coasts for source
+ src_with_coasts = [self.map.find_coasts(src) for src in abut_srcs + convoy_srcs]
+ src_with_coasts = {val for sublist in src_with_coasts for val in sublist}
+
+ for src in src_with_coasts:
+
+ # Checking if there is a unit on the src location
+ if self._unit_owner('A {}'.format(src)):
+ src_unit_type = 'A'
+ elif self._unit_owner('F {}'.format(src)):
+ src_unit_type = 'F'
+ else:
+ continue
+
+ # Checking if src unit can move to dest (through adj or convoy), and that we can support it
+ # Only armies can move through convoy
+ if src[:3] != unit_loc[:3] \
+ and self._abuts(unit_type, unit_loc, 'S', dest) \
+ and ((src in convoy_srcs and src_unit_type == 'A')
+ or self._abuts(src_unit_type, src, '-', dest)):
+
+ # Adding with coast
+ possible_orders += ['{} S {} {} - {}'.format(unit, src_unit_type, src, dest)]
+
+ # Adding without coasts
+ if '/' in dest:
+ possible_orders += ['{} S {} {} - {}'.format(unit, src_unit_type, src, dest[:3])]
+
+ # Convoy
+ if unit_type == 'F':
+ convoy_srcs = self._get_convoy_destinations(unit_type, unit_loc, unit_is_convoyer=True)
+ for src in convoy_srcs:
+
+ # Checking if there is a unit on the src location
+ if unit_type == 'F' and self._unit_owner('A {}'.format(src)):
+ src_unit_type = 'A'
+ else:
+ continue
+
+ # Checking where the src unit can actually go
+ convoy_dests = self._get_convoy_destinations(src_unit_type, src, unit_is_convoyer=False)
+
+ # Adding them as possible moves
+ for dest in convoy_dests:
+ if self._has_convoy_path(src_unit_type, src, dest, convoying_loc=unit_loc):
+ possible_orders += ['{} C {} {} - {}'.format(unit, src_unit_type, src, dest)]
+
+ # Retreat phase
+ if self.phase_type == 'R':
+
+ # Disband
+ if is_dislodged:
+ possible_orders += ['{} D'.format(unit)]
+
+ # Retreat
+ if is_dislodged:
+ retreat_locs = unit_power.retreats[unit]
+ for dest in retreat_locs:
+ dest = dest.upper()
+ if not self._unit_owner('A {}'.format(dest[:3]), coast_required=0) \
+ and not self._unit_owner('F {}'.format(dest[:3]), coast_required=0):
+ possible_orders += ['{} R {}'.format(unit, dest)]
+
+ # Adjustment Phase
+ if self.phase_type == 'A':
+ build_sites = self._build_sites(unit_power)
+
+ # Disband
+ if build_count < 0 and unit:
+ possible_orders += ['{} D'.format(unit)]
+
+ # Build Army / Fleet
+ if build_count > 0 \
+ and loc[:3] in build_sites \
+ and not self._unit_owner('A ' + loc[:3], coast_required=0) \
+ and not self._unit_owner('F ' + loc[:3], coast_required=0):
+ if self.map.is_valid_unit('A {}'.format(loc)):
+ possible_orders += ['A {} B'.format(loc)]
+ if self.map.is_valid_unit('F {}'.format(loc)):
+ possible_orders += ['F {} B'.format(loc)]
+
+ # Waive
+ if build_count > 0:
+ possible_orders += ['WAIVE']
+
+ # Removing duplicate
+ return remove_duplicates(possible_orders)
+
+ # ====================================================================
+ # Private Interface - CONVOYS Methods
+ # ====================================================================
+ def _build_list_possible_convoys(self):
+ """ Regenerates the list of possible convoy paths given the current fleet locations """
+ # Already generated
+ if self.convoy_paths_possible is not None:
+ return
+ self.convoy_paths_possible = []
+ self.convoy_paths_dest = {}
+
+ # Finding fleets on water
+ convoying_locs = []
+ for power in self.powers.values():
+ for unit in power.units:
+ if unit[0] == 'F' and self.map.area_type(unit[2:]) in ['WATER', 'PORT']:
+ convoying_locs += [unit[2:]]
+ convoying_locs = set(convoying_locs)
+
+ # Finding all possible convoy paths
+ for nb_fleets in range(1, len(convoying_locs) + 1):
+ for start, fleets, dests in self.map.convoy_paths[nb_fleets]:
+ if fleets.issubset(convoying_locs):
+ self.convoy_paths_possible += [(start, fleets, dests)]
+
+ # Marking path to dest
+ self.convoy_paths_dest.setdefault(start, {})
+ for dest in dests:
+ self.convoy_paths_dest[start].setdefault(dest, [])
+ self.convoy_paths_dest[start][dest] += [fleets]
+
+ def _is_convoyer(self, army, loc):
+ """ Detects if there is a convoyer at thru location for army/fleet (e.g. can an army be convoyed through PAR)
+ :param army: Boolean to indicate if unit being convoyed is army (1) or fleet (0)
+ :param loc: Location we are checking (e.g. 'STP/SC')
+ :return: Boolean to indicate if unit can be convoyed through location
+ """
+ # Armies can't convoy fleet, so if unit being convoyed is not an army, convoy not possible
+ if not army:
+ return False
+
+ # Army can convoy through water, all units can convoy through port
+ area_type = self.map.area_type(loc)
+ area_type_cond = ((area_type == 'WATER') == army or area_type == 'PORT')
+
+ # Making sure there is a valid unit on thru location to perform convoy
+ unit_type_cond = self._unit_owner('F %s' % loc, coast_required=0)
+ return area_type_cond and unit_type_cond
+
+ def _is_moving_via_convoy(self, unit):
+ """ Determines if a unit is moving via a convoy or through land
+ :param unit: The name of the unit (e.g. 'A PAR')
+ :return: A boolean (True, False) to indicate if the unit is moving via convoy
+ """
+ # Not moving or no paths
+ if unit not in self.command or self.command[unit][0] != '-':
+ return False
+ if unit not in self.convoy_paths or not self.convoy_paths[unit]:
+ return False
+
+ # Otherwise, convoying since there is still an active valid path
+ return True
+
+ def _has_convoy_path(self, unit, start, end, convoying_loc=None):
+ """ Determines if there is a convoy path for unit
+ :param unit: The unit BEING convoyed (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param end: The destination of the unit (e.g. 'MAR')
+ :param convoying_loc: Optional. If set, the convoying location must be in one of the paths
+ :return: A boolean flag to indicate if the convoy is possible (if all units cooperate)
+ """
+ if unit != 'A':
+ return False
+
+ # Checking in table if there is a valid path and optionally if the convoying loc is in the path
+ self._build_list_possible_convoys()
+ active_paths = self.convoy_paths_dest.get(start, {}).get(end, [])
+ return active_paths and (convoying_loc is None or [1 for path in active_paths if convoying_loc in path])
+
+ def _get_convoying_units_for_path(self, unit, start, end):
+ """ Returns a list of units who have submitted orders to convoy 'unit' from 'start' to 'end'
+ :param unit: The unit BEING convoyed (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param end: The destination of the unit (e.g. 'MAR')
+ :return: A list of convoying units (e.g. ['F NAO', 'F MAO']) having current orders to convoy path
+ """
+ convoying_units = []
+ army = unit != 'F'
+ expected_order = 'C %s %s - %s' % (unit, start[:3], end[:3])
+ for unit_loc, unit_order in list(self.command.items()):
+ if unit_order == expected_order and self._is_convoyer(army, unit_loc[2:]):
+ convoying_units += [unit_loc]
+ return convoying_units
+
+ def _get_convoy_destinations(self, unit, start, unit_is_convoyer=False, exclude_convoy_locs=None):
+ """ Returns a list of possible convoy destinations for a unit
+ :param unit: The unit BEING convoyed (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param unit_is_convoyer: Boolean flag. If true, list all the dests that an unit being convoyed by unit
+ could reach
+ :param exclude_convoy_locs: Optional. A list of convoying location that needs to be excluded from all paths.
+ :return: A list of convoying destinations (e.g. ['PAR', 'MAR']) that can be reached from start
+ """
+ if unit == 'A' and unit_is_convoyer:
+ return []
+ if unit == 'F' and not unit_is_convoyer:
+ return []
+
+ # Building cache
+ self._build_list_possible_convoys()
+
+ # If we are moving via convoy, we just read the destinations from the table
+ if not unit_is_convoyer:
+ if not exclude_convoy_locs:
+ return list(self.convoy_paths_dest.get(start, {}).keys())
+
+ # We need to loop to make sure there is a path without the excluded convoyer
+ dests = []
+ for dest, paths in self.convoy_paths_dest.get(start, {}).items():
+ for path in paths:
+ if not [1 for excluded_loc in exclude_convoy_locs if excluded_loc in path]:
+ dests += [dest]
+ break
+ return dests
+
+ # If we are convoying, we need to loop through the possible convoy paths
+ valid_dests = set([])
+ for _, fleets, dests in self.convoy_paths_possible:
+ if start in fleets and (exclude_convoy_locs is None
+ or not [1 for excluded_loc in exclude_convoy_locs if excluded_loc in fleets]):
+ valid_dests |= dests
+ return list(valid_dests)
+
+ def _get_convoy_paths(self, unit_type, start, end, via, convoying_units):
+ """ Return a list of all possible convoy paths (using convoying units) from start to end
+ :param unit_type: The unit type BEING convoyed (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param end: The destination of the unit (e.g. 'MAR')
+ :param via: Boolean flag (0 or 1) to indicate if we want only paths with a local convoyer, or also paths
+ including only foreign convoyers
+ :param convoying_units: The list of units who can convoy the unit
+ :return: A list of paths from start to end using convoying_units
+ """
+ if unit_type != 'A' or not convoying_units:
+ return []
+
+ # Building cache and finding possible paths with convoying units
+ # Adding start and end location to every path
+ self._build_list_possible_convoys()
+ fleets = {loc[2:] for loc in convoying_units}
+ paths = [path for path in self.convoy_paths_dest.get(start, {}).get(end, set([])) if path.issubset(fleets)]
+ paths = [[start] + list(path) + [end] for path in paths]
+ paths.sort(key=len)
+
+ # No paths found
+ if not paths:
+ return []
+
+ # We have intent to convoy, so we can use all paths
+ if via:
+ return paths
+
+ # Assuming intent if end is not reachable from start (i.e. a convoy is required)
+ if not self._abuts(unit_type, start, 'S', end):
+ return paths
+
+ # Otherwise, detecting if we intended to convoy
+ unit_owner = self._unit_owner('%s %s' % (unit_type, start), coast_required=0)
+ for convoyer in convoying_units:
+ convoy_owner = self._unit_owner(convoyer, coast_required=1)
+
+ # We have intent if one of the power's fleet issued a convoyed order
+ # and there was a path using that fleet to move from start to end
+ if unit_owner == convoy_owner and \
+ self._has_convoy_path(unit_type, start, end, convoying_loc=convoyer[2:]):
+ return paths
+
+ # We could not detect intent
+ return []
+
+ def _get_distance_to_home(self, unit_type, start, homes):
+ """ Calculate the distance from unit to one of its homes
+ Armies can move over water (4.D.8 choice d)
+ :param unit_type: The unit type to calculate distance (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param homes: The list of homes (first one reached calculates the distance)
+ :return: The minimum distance from unit to one of the homes
+ """
+ visited = []
+ if not homes:
+ return 99999
+
+ # Modified Djikstra
+ to_check = PriorityDict()
+ to_check[start] = 0
+ while to_check:
+ distance, current = to_check.smallest()
+ del to_check[current]
+
+ # Found smallest distance
+ if current[:3] in homes:
+ return distance
+
+ # Marking visited
+ if current in visited:
+ continue
+ visited += [current]
+
+ # Finding neighbors and updating distance
+ for loc in self.map.abut_list(current, incl_no_coast=True):
+ loc = loc.upper()
+ if loc in visited:
+ continue
+
+ # Calculating distance for armies over LAND/WATER/COAST and for Fleet over WATER/COAST
+ if unit_type == 'A' or self._abuts(unit_type, current, '-', loc):
+ loc_distance = to_check[loc] if loc in to_check else 99999
+ to_check[loc] = min(distance + 1, loc_distance)
+
+ # Could not find destination
+ return 99999
+
+ # ====================================================================
+ # Private Interface - ORDER Validation Methods
+ # ====================================================================
+ def _valid_order(self, power, unit, order, report=1):
+ """ Determines if an order is valid
+ :param power: The power submitting the order
+ :param unit: The unit being affected by the order (e.g. 'A PAR')
+ :param order: The actual order (e.g. 'H' or 'S A MAR')
+ :param report: Boolean to report errors in self.errors
+ :return: One of the following:
+ None - The order is NOT valid at all
+ -1 - It is NOT valid, BUT it does not get reported because it may be used to signal support
+ 0 - It is valid, BUT some unit mentioned does not exist
+ 1 - It is completed valid
+ """
+ # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
+ # No order
+ if not order:
+ return None
+ word = order.split()
+ owner = self._unit_owner(unit)
+ rules = self.rules
+
+ # No order
+ if not word:
+ return None
+
+ status = 1 if owner is not None else 0
+ unit_type = unit[0]
+ unit_loc = unit[2:]
+ order_type = word[0]
+
+ # Make sure the unit exists (or if the player is in a game in which he can't necessarily know) could exist.
+ # Also make sure any mentioned (supported or conveyed) unit could exists and could reach the listed destination
+ if not self.map.is_valid_unit(unit):
+ if report:
+ self.error.append(err.GAME_ORDER_TO_INVALID_UNIT % unit)
+ return None
+
+ # Support / Convoy - 'S A/F XXX - YYY'
+ if order_type in ('S', 'C') and word[1:]:
+ if word[1] in ('A', 'F'):
+ alter, other = word[1:3]
+ else:
+ alter, other = '?', word[1]
+
+ # Checks if A/F XXX is a valid unit for loc (source)
+ other = alter + ' ' + other
+ if not self.map.is_valid_unit(other, no_coast_ok=1):
+ if report:
+ self.error.append(err.GAME_ORDER_INCLUDES_INVALID_UNIT % other)
+ return None
+
+ # S [A/F] XXX - YYY
+ # Checks if A/F YYY is a valid unit for loc (dest)
+ if len(word) == 5 - (alter == '?'):
+ other = alter + ' ' + word[-1]
+ if not self.map.is_valid_unit(other, no_coast_ok=1):
+ if report:
+ self.error.append(err.GAME_ORDER_INCLUDES_INVALID_DEST % other)
+ return None
+
+ # Check if unit exists
+ # Status - 1 if unit has owner, 0 otherwise (Non-existent unit)
+ if not status:
+ if report:
+ self.error.append(err.GAME_ORDER_NON_EXISTENT_UNIT % unit)
+ return None
+ if power is not owner:
+ if report:
+ self.error.append(err.GAME_ORDER_TO_FOREIGN_UNIT % unit)
+ return None
+
+ # Validate that anything in a SHUT location is only ordered to HOLD
+ if self.map.area_type(unit_loc) == 'SHUT' and order_type != 'H':
+ if report:
+ self.error.append(err.GAME_UNIT_MAY_ONLY_HOLD % unit)
+ return None
+
+ # Validate support and convoy orders
+ # Triggers error if Army trying to convoys
+ if order_type == 'C' and (unit_type != 'F' or (self.map.area_type(unit_loc) not in ('WATER', 'PORT'))):
+ if report:
+ self.error.append(err.GAME_CONVOY_IMPROPER_UNIT % (unit, order))
+ return None
+
+ # -------------------------------------------------------------
+ # SUPPORT OR CONVOY ORDER
+ if order_type in ('C', 'S'):
+
+ # Add the unit type (or '?') if not specified.
+ # Note that the unit type is NOT added to the actual order -- just used during checking.
+ order_text = 'CONVOY' if order_type == 'C' else 'SUPPORT'
+ if len(word) > 1 and word[1] not in ('A', 'F'):
+ terrain = self.map.area_type(word[1])
+ if order_type == 'C':
+ word[1:1] = ['AF'[unit_type == 'A']] # Convoying the opposite unit type A-F and F-A
+ elif terrain == 'WATER':
+ word[1:1] = ['F']
+ elif terrain == 'LAND':
+ word[1:1] = ['A']
+ elif terrain: # Other terrain, trying to determine if XXX exist
+ its_unit_type = [unit_type for unit_type in 'AF' if self._unit_owner(unit_type + ' ' + word[1])]
+ if its_unit_type:
+ word[1:1] = its_unit_type
+ else:
+ if report:
+ self.error.append(err.GAME_INVALID_ORDER_NON_EXISTENT_UNIT % (order_text, unit, order))
+ return None
+ else:
+ if report:
+ self.error.append(err.GAME_INVALID_ORDER_RECIPIENT % (order_text, unit, order))
+ return None
+
+ # Make sure we have enough to work with
+ # e.g. syntax S A XXX - YYY or at least S XXX YYY
+ if len(word) < 3:
+ if report:
+ self.error.append(err.GAME_BAD_ORDER_SYNTAX % (order_text, unit, order))
+ return None
+
+ # Check that the recipient of the support or convoy exists
+ rcvr, dest = ' '.join(word[1:3]), word[2]
+ if not self._unit_owner(rcvr, 0):
+ if report:
+ self.error.append(err.GAME_ORDER_RECIPIENT_DOES_NOT_EXIST % (order_text, unit, order))
+ return None
+
+ # Check that the recipient is not the same unit as the supporter
+ if unit_loc == dest:
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_SUPPORT_ITSELF % (unit, order))
+ return None
+
+ # Only units on coasts can be convoyed, or invalid units convoying
+ if order_type == 'C' \
+ and (word[1] != 'AF'[unit_type == 'A'] or self.map.area_type(dest) not in ('COAST', 'PORT')):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_BE_CONVOYED % (unit, order))
+ return None
+
+ # Handle orders of the form C U xxx - xxx and S U xxx - xxx
+ if len(word) == 5:
+ if word[3] != '-':
+ if report:
+ self.error.append(err.GAME_BAD_ORDER_SYNTAX % (order_text, unit, order))
+ return None
+ dest = word[4]
+
+ # Coast is specified in the move, but ignored in the support and convoy order
+ # DATC 6.B.4
+ if '/' in dest:
+ dest = dest[:dest.find('/')]
+
+ # Making sure the dest is COAST,PORT and that the convoyed order can land there
+ if order_type == 'C':
+ if not (self.map.area_type(dest) in ('COAST', 'PORT')
+ and self.map.is_valid_unit(word[1] + ' ' + dest, unit[0] < 'F')):
+ if report:
+ self.error.append(err.GAME_BAD_CONVOY_DESTINATION % (unit, order))
+ return None
+
+ # Checking that support can reach destination...
+ elif (not self._abuts(word[1], word[2], order_type, dest)
+ and (rcvr[0] == 'F'
+ or not self._has_convoy_path(word[1], word[2][:3], dest))):
+ if report:
+ self.error.append(err.GAME_SUPPORTED_UNIT_CANT_REACH_DESTINATION % (unit, order))
+ return None
+
+ # Make sure that a convoy order was formatted as above
+ elif order_type == 'C':
+ if report:
+ self.error.append(err.GAME_IMPROPER_CONVOY_ORDER % (unit, order))
+ return None
+
+ # Make sure a support order was either as above or as S U xxx or as S U xxx H
+ elif len(word) != 3 and (len(word) != 4 or word[-1] != 'H'):
+ if report:
+ self.error.append(err.GAME_IMPROPER_SUPPORT_ORDER % (unit, order))
+ return None
+
+ # Make sure the support destination can be reached...
+ if order_type == 'S':
+ if not self._abuts(unit_type, unit_loc, order_type, dest):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_PROVIDE_SUPPORT_TO_DEST % (unit, order))
+ return None
+
+ # ... or that the fleet can perform the described convoy
+ elif not self._has_convoy_path(rcvr[0], rcvr[2:5], dest, convoying_loc=unit_loc):
+ if report:
+ self.error.append(err.GAME_IMPOSSIBLE_CONVOY_ORDER % (unit, order))
+ return None
+
+ # -------------------------------------------------------------
+ # MOVE order
+ elif order_type == '-':
+ # Expected format '- xxx' or '- xxx - yyy - zzz'
+ if (len(word) & 1 and word[-1] != 'VIA') or (len(word[:-1]) & 1 and word[-1] == 'VIA'):
+ if report:
+ self.error.append(err.GAME_BAD_MOVE_ORDER % (unit, order))
+ return None
+
+ # Only a convoying army can give a path
+ if len(word) > 2 and unit_type != 'A' and self.map.area_type(unit_loc) not in ('COAST', 'PORT'):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_CONVOY % (unit, order))
+ return None
+
+ # Step through every "- xxx" in the order and ensure the unit can get there at every step
+ src = unit_loc
+ order_type = 'C-'[len(word) == 2 or (len(word) == 3 and word[-1] == 'VIA')]
+ visit = []
+
+ # Checking that unit is not returning back where it started ...
+ if word[-1] == unit_loc and order_type < 'C':
+ if report:
+ self.error.append(err.GAME_MOVING_UNIT_CANT_RETURN % (unit, order))
+ return None
+
+ # For a multi-step convoy
+ if order_type == 'C':
+
+ # Checking that destination is a COAST or PORT ...
+ if self.map.area_type(word[-1]) not in ('COAST', 'PORT'):
+ if report:
+ self.error.append(err.GAME_CONVOYING_UNIT_MUST_REACH_COST % (unit, order))
+ return None
+
+ # Making sure that army is not having a specific coast as destination ...
+ if unit_type == 'A' and '/' in word[-1]:
+ if report:
+ self.error.append(err.GAME_ARMY_CANT_CONVOY_TO_COAST % (unit, order))
+ return None
+
+ # Making sure that the syntax is '- xxx - yyy - zzz'
+ offset = 1 if word[-1] == 'VIA' else 0
+ if [1 for x in range(0, len(word) - offset, 2) if word[x] != '-']:
+ if report:
+ self.error.append(err.GAME_BAD_MOVE_ORDER % (unit, order))
+ return None
+
+ # For every location touched
+ ride = word[1:len(word) - offset:2]
+ for num, to_loc in enumerate(ride):
+
+ # Checking that ride is not visited twice ...
+ if to_loc in visit and 'CONVOY_BACK' not in rules:
+ if report:
+ self.error.append(err.GAME_CONVOY_UNIT_USED_TWICE % (unit, order))
+ return None
+ visit += [to_loc]
+
+ # Making sure the last 2 locations touch, and that A/F can convoy through them
+ # pylint: disable=too-many-boolean-expressions
+ if (not self._abuts(unit_type, src, order_type, to_loc)
+ and not self._has_convoy_path(unit_type, unit_loc, to_loc)
+ and (len(word) == 2
+ or unit_type == 'A' and (not self._abuts('F', to_loc, 'S', src))
+ or (unit_type == 'F'
+ and (to_loc[:3].upper() not in
+ [abut[:3].upper() for abut in self.map.abut_list(src[:3])])))):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_MOVE_INTO_DEST % (unit, order))
+ return None
+
+ # If VIA flag set, make sure there is at least a possible path
+ if word[-1] == 'VIA' and not self._has_convoy_path(unit_type, unit_loc, to_loc):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_MOVE_VIA_CONVOY_INTO_DEST % (unit, order))
+ return None
+
+ # If we are at an intermediary location
+ if num < len(ride) - 1:
+
+ # Trying to portage convoy fleet through water
+ # or trying to convoy army through LAND or COAST
+ if (unit_type == 'F'
+ and ((unit_type == 'A' and self.map.area_type(to_loc) not in ('WATER', 'PORT'))
+ or unit_type + self.map.area_type(to_loc) == 'FWATER')):
+ if report:
+ self.error.append(err.GAME_BAD_CONVOY_MOVE_ORDER % (unit, order))
+ return None
+
+ # Making sure there is a unit there to convoy ...
+ if not self._unit_owner('AF'[unit_type == 'A'] + ' ' + to_loc):
+ if report:
+ self.error.append(err.GAME_CONVOY_THROUGH_NON_EXISTENT_UNIT % (unit, order))
+ return None
+
+ # Portaging fleets must finish the turn on a coastal location listed in upper-case
+ elif (num
+ and unit_type == 'F'
+ and (to_loc not in self.map.loc_abut or self.map.area_type(to_loc) not in ('COAST', 'PORT'))):
+ if report:
+ self.error.append(err.GAME_IMPOSSIBLE_CONVOY % (unit, order))
+ return None
+ src = to_loc
+
+ # -------------------------------------------------------------
+ # HOLD order
+ elif order_type == 'H':
+ if len(word) != 1:
+ if report:
+ self.error.append(err.GAME_INVALID_HOLD_ORDER % (unit, order))
+ return None
+
+ else:
+ if report:
+ self.error.append(err.GAME_UNRECOGNIZED_ORDER_TYPE % (unit, order))
+ return None
+
+ # All done
+ return status
+
+ def _expand_order(self, word):
+ """ Detects errors in order, convert to short version, and expand the default coast if necessary
+ :param word: The words (e.g. order.split()) for an order
+ (e.g. ['England:', 'Army', 'Rumania', 'SUPPORT', 'German', 'Army', 'Bulgaria']).
+ :return: The compacted and expanded order (e.g. ['A', 'RUM', 'S', 'A', 'BUL'])
+ """
+ if not word:
+ return word
+
+ result = self.map.compact(' '.join(word))
+ result = self.map.vet(self.map.rearrange(result), 1)
+
+ # Removing errors (Negative values)
+ final, order = [], ''
+ for result_ix, (token, token_type) in enumerate(result):
+ if token_type < 1:
+ if token_type == -1 * POWER:
+ self.error.append(err.GAME_UNKNOWN_POWER % token)
+ continue
+ elif token_type == -1 * UNIT:
+ self.error.append(err.GAME_UNKNOWN_UNIT_TYPE % token)
+ continue
+ elif token_type == -1 * LOCATION:
+ self.error.append(err.GAME_UNKNOWN_LOCATION % token)
+ elif token_type == -1 * COAST:
+ token_without_coast = token.split('/')[0]
+ if token_without_coast in self.map.aliases.values():
+ self.error.append(err.GAME_UNKNOWN_COAST % token)
+ result[result_ix] = token_without_coast, -1 * LOCATION
+ else:
+ self.error.append(err.GAME_UNKNOWN_LOCATION % token)
+ elif token_type == -1 * ORDER:
+ self.error.append(err.GAME_UNKNOWN_ORDER_TYPE % token)
+ continue
+ else:
+ self.error.append(err.GAME_UNRECOGNIZED_ORDER_DATA % token)
+ continue
+ token_type = -1 * token_type
+
+ # Remove power names. Checking ownership of the unit might be better
+ if token_type == POWER:
+ continue
+
+ # Remove the "H" from any order having the form "u xxx S xxx H"
+ # Otherwise storing order
+ elif token_type == ORDER:
+ if order == 'S' and token == 'H':
+ continue
+ order += token
+
+ # Treat each move order the same. Eventually we'd want to distinguish between them
+ elif token_type == MOVE_SEP:
+ result[result_ix] = '-', token_type
+ order += '-'
+
+ elif token_type == OTHER:
+ order = ''
+
+ # Spot ambiguous place names and coasts in support and convoy orders
+ if 'NO_CHECK' in self.rules:
+ if token_type == LOCATION and token in self.map.unclear:
+ self.error.append(err.GAME_AMBIGUOUS_PLACE_NAME % token)
+ if token_type == COAST and token.split('/')[0] in self.map.unclear:
+ self.error.append(err.GAME_AMBIGUOUS_PLACE_NAME % token)
+
+ final += [token]
+
+ # Default any fleet move's coastal destination, then we're done
+ return self.map.default_coast(final)
+
+ def _expand_coast(self, word):
+ """ Makes sure the correct coast is specified (if any) is specified.
+ For Fleets: Adjust to correct coast if wrong coast is specified
+ For Armies: Removes coast if coast is specified
+ (e.g. if F is on SPA/SC but the order is F SPA/NC - LYO, the coast will be added or corrected)
+ :param word: A list of tokens (e.g. ['F', 'GRE', '-', 'BUL'])
+ :return: The updated list of tokens (e.g. ['F', 'GRE', '-', 'BUL/SC'])
+ """
+ if not word:
+ return word
+
+ unit_type = word[0]
+ loc = word[1]
+ loc_without_coast = loc[:loc.find('/')] if '/' in loc else loc
+
+ # For armies: Removing coast if specified
+ if unit_type == 'A':
+ if '/' in loc:
+ word[1] = loc_without_coast
+ if len(word) == 4 and '/' in word[3]:
+ word[3] = word[3][:word[3].find('/')]
+
+ # For fleets: If there is a unit in the country, but not on the specified coast, we need to correct the coast
+ elif self._unit_owner('%s %s' % (unit_type, loc), coast_required=1) is None \
+ and self._unit_owner('%s %s' % (unit_type, loc_without_coast), coast_required=0) is not None:
+
+ # Finding the correct coast
+ for loc in [l for l in self.map.locs if l[:3] == loc_without_coast]:
+ if self._unit_owner('%s %s' % (word[0], loc), coast_required=1) is not None:
+ word[1] = loc
+ break
+
+ # Removing cost if unit is supporting an army moving to coast
+ # F WES S A MAR - SPA/SC -> F WES S A MAR - SPA
+ if len(word) == 7 and '/' in word[-1] and word[2] == 'S' and word[3] == 'A':
+ dest = word[-1]
+ word[-1] = dest[:dest.find('/')]
+
+ # Adjusting the coast if a fleet is supporting a move to the wrong coast
+ # F WES S F GAS - SPA/SC -> F WES S F GAS - SPA/NC
+ if len(word) == 7 and word[0] == 'F' and word[2] == 'S' and word[3] == 'F' and '/' in word[-1]:
+ word = word[:3] + self.map.default_coast(word[3:6] + [word[6][:3]])
+
+ # Returning with coasts fixed
+ return word
+
+ def _add_unit_types(self, item):
+ """ Adds any missing "A" and "F" designations and (current) coastal locations for fleets.
+ :param item: The words for expand_order() (e.g. ['A', 'RUM', 'S', 'BUL'])
+ :return: The list of items with A/F and coasts added (e.g. ['A', 'RUM', 'S', 'A', 'BUL'])
+ """
+ # dependent is set when A/F is expected afterwards (so at start and after C/S)
+ # had_type indicates that A/F was found
+ word, dependent, had_type = [], 1, 0
+ for token in item:
+ if not dependent:
+ dependent = token in 'CS'
+ elif token in 'AF':
+ had_type = 1
+ elif token in ('RETREAT', 'DISBAND', 'BUILD', 'REMOVE'):
+ pass
+ else:
+ try:
+ # We have a location
+ # Try to find an active or retreating unit at current location
+ unit = [unit for power in self.powers.values()
+ for unit in (power.units, power.retreats.keys())[self.phase_type == 'R']
+ if unit[2:].startswith(token)][0]
+
+ # If A/F is missing, add it
+ if not had_type:
+ word += [unit[0]]
+
+ # Trying to detect if coast is specified in retrieved unit location
+ # If yes, update the token, so it incorporates coastal information
+ if self.map.is_valid_unit(word[-1] + unit[1:]):
+ token = unit[2:]
+ except IndexError:
+ pass
+ dependent = had_type = 0
+
+ # Add token to final list
+ word += [token]
+ return word
+
+ def _add_coasts(self):
+ """ This method adds the matching coast to orders supporting or (portage) convoying a fleet to
+ a multi-coast province.
+ :return: Nothing
+ """
+ # converting to unique format
+ orders = {}
+ for unit, order in self.orders.items():
+ orders[unit] = order
+
+ # Add coasts to support and (portage) convoy orders for fleets moving to a specific coast
+ for unit, order in orders.items():
+ # Only rewriting 'S F XXX - YYY' and 'C F XXX - YYY'
+ if order[:3] not in ('S F', 'C F'):
+ continue
+ word = order.split()
+
+ # rcvr is the unit receiving the support or convoy (e.g. F XXX in S F XXX - BER)
+ # Making sure rcvr has also submitted orders (e.g. F XXX - YYY)
+ rcvr = ' '.join(word[1:3])
+ try:
+ rcvr = [x for x in orders if x.startswith(rcvr)][0]
+ except IndexError:
+ # No orders found
+ continue
+
+ # Updating order to include rcvr full starting position (with coasts)
+ orders[unit] = ' '.join([order[0], rcvr] + word[3:]).strip()
+
+ # Checking if coast is specified in destination position
+ if '-' in order:
+ # his -> '- dest/coast'
+ # updating order if coast is specified in his dest, but not ours
+ his = ' '.join(orders.get(rcvr, '').split()[-2:])
+ if his[0] == '-' and his.split('/')[0] == ' '.join(word[3:]):
+ orders[unit] = order[:2] + rcvr + ' ' + his
+
+ # Updating game.orders object
+ self.orders[unit] = orders[unit]
+
+ def _validate_status(self, reinit_powers=True):
+ """ Validates the status of the game object"""
+ # Loading map and setting victory condition
+ if not self.map:
+ self.load_map(reinit_powers=reinit_powers)
+ self.victory = self.map.victory
+
+ # By default, 50% +1 of the scs
+ # Or for victory homes, half the average number of home centers belonging to other powers plus one
+ if not self.victory:
+ self.victory = [len(self.map.scs) // 2 + 1]
+
+ # Ensure game phase was set
+ if not self.phase:
+ self.phase = self.map.phase
+ apart = self.phase.split()
+ if len(apart) == 3:
+ if '%s %s' % (apart[0], apart[2]) not in self.map.seq:
+ self.error += [err.GAME_BAD_PHASE_NOT_IN_FLOW]
+ self.phase_type = apart[2][0]
+ else:
+ self.phase_type = '-'
+
+ # Validate the BEGIN phase (if one was given)
+ if self.phase == 'FORMING':
+ apart = self.map.phase.split()
+ try:
+ int(apart[1])
+ del apart[1]
+ if ' '.join(apart) not in self.map.seq:
+ raise Exception()
+ except ValueError:
+ self.error += [err.GAME_BAD_BEGIN_PHASE]
+
+ # Set victory condition
+ if self.phase not in ('FORMING', 'COMPLETED'):
+ try:
+ year = abs(int(self.phase.split()[1]) - self.map.first_year)
+ win = self.victory[:]
+ self.win = win[min(year, len(win) - 1)]
+ except ValueError:
+ self.error += [err.GAME_BAD_YEAR_GAME_PHASE]
+
+ # Initialize power data
+ for power in self.powers.values():
+
+ # Initialize homes if needed
+ if power.homes is None:
+ power.homes = []
+ for home in self.map.homes.get(power.name, []):
+ self.update_hash(power.name, loc=home, is_home=True)
+ power.homes.append(home)
+
+ # ====================================================================
+ # Private Interface - Generic methods
+ # ====================================================================
+ def _load_rules(self):
+ """ Loads the list of rules and their forced (+) and denied (!) corresponding rules
+ :return: A tuple of dictionaries: rules, forced, and denied
+ rules = {'NO_CHECK':
+ { 'group': '3 Movement Order',
+ 'variant': 'standard',
+ '!': ['RULE_1', 'RULE_2'],
+ '+': ['RULE_3'] } }
+ forced = {'payola': 'RULE_4'}
+ denied = {'payola': 'RULE_5'}
+ """
+ if self.__class__.rule_cache:
+ return self.__class__.rule_cache
+ group = variant = ''
+ data, forced, denied = {}, {}, {}
+ file_path = os.path.join(settings.PACKAGE_DIR, 'README_RULES.txt')
+
+ if not os.path.exists(file_path):
+ self.error.append(err.GAME_UNABLE_TO_FIND_RULES)
+ return data, forced, denied
+
+ with open(file_path, 'r', encoding='utf-8') as file:
+ for line in file:
+ word = line.strip().split()
+
+ # Rules are in the format <!-- RULE NAME !RULE_1 +RULE_2 -->
+ # Where ! indicates a denied rule, and + indicates a forced rule
+ if word[:2] == ['<!--', 'RULE'] and word[-1][-1] == '>':
+
+ # <!-- RULE GROUP 6 Secrecy -->
+ # group would be '6 Secrecy'
+ if word[2] == 'GROUP':
+ group = ' '.join(word[3:-1])
+
+ # <!-- RULE VARIANT standard -->
+ elif word[2] == 'VARIANT':
+ variant = word[3]
+ forced[variant] = [x[1:] for x in word[4:-1] if x[0] == '+']
+ denied[variant] = [x[1:] for x in word[4:-1] if x[0] == '!']
+
+ # <!-- RULE NAME !RULE_1 +RULE_2 -->
+ elif word[2] != 'END':
+ rule = word[2]
+ if rule not in data:
+ data[rule] = {'group': group, 'variant': variant}
+ for control in word[3:-1]:
+ if control[0] in '-=+!':
+ data[rule].setdefault(control[0], []).append(control[1:])
+
+ self.__class__.rule_cache = (data, forced, denied)
+ return data, forced, denied
+
+ def _build_hash_table(self):
+ """ Builds the Zobrist hash tables """
+ if not self.map or self.map_name in self.__class__.zobrist_tables:
+ return
+
+ # Finding powers and locations
+ map_powers = sorted([power_name for power_name in self.map.powers])
+ map_locs = sorted([loc.upper() for loc in self.map.locs if self.map.area_type(loc) != 'SHUT'])
+ nb_powers = len(map_powers)
+ nb_locs = len(map_locs)
+
+ # Generating a standardized seed
+ np_state = np.random.get_state()
+ np.random.seed(12345 + sum([ord(x) * 7 ** ix for ix, x in enumerate(self.map_name)]) % 2 ** 32)
+ self.__class__.zobrist_tables[self.map_name] = {
+ 'unit_type': np.random.randint(1, sys.maxsize, [2, nb_locs]),
+ 'units': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]),
+ 'dis_unit_type': np.random.randint(1, sys.maxsize, [2, nb_locs]),
+ 'dis_units': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]),
+ 'centers': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]),
+ 'homes': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]),
+ 'map_powers': map_powers,
+ 'map_locs': map_locs
+ }
+ np.random.set_state(np_state)
+
+ # ====================================================================
+ # Private Interface - PROCESSING and phase change methods
+ # ====================================================================
+ def _begin(self):
+ """ Called to begin the game and move to the start phase
+ :return: Nothing
+ """
+ self._move_to_start_phase()
+ self.note = ''
+ self.win = self.victory[0]
+ # Create dummy power objects for non-loaded powers.
+ for power_name in self.map.powers:
+ if power_name not in self.powers:
+ self.powers[power_name] = Power(self, power_name, role=self.role)
+ # Initialize all powers.
+ for starter in self.powers.values():
+ # Starter having type won't be initialized.
+ starter.initialize(self)
+
+ def _process(self):
+ """ Processes the current phase of the game """
+ # Convert all raw movement phase "ORDER"s in a NO_CHECK game to standard orders before calling
+ # Game.process(). All "INVALID" and "REORDER" orders are left raw -- the Game.move_results() method
+ # knows how to detect and report them
+ if 'NO_CHECK' in self.rules and self.phase_type == 'M':
+ for power in self.powers.values():
+ orders, power.orders, civil_disorder = power.orders, {}, power.civil_disorder
+ for status, order in orders.items():
+ if status[:5] != 'ORDER':
+ power.orders[status] = order
+ elif order:
+ self._add_order(power, order.split())
+ power.civil_disorder = civil_disorder
+
+ # Processing the game
+ if self.phase_type == 'M':
+ self._determine_orders()
+ self._add_coasts()
+
+ # Resolving orders
+ self._resolve()
+
+ def _advance_phase(self):
+ """ Advance the game to the next phase (skipping phases with no actions)
+ :return: A list of lines to put in the results
+ """
+
+ # Save results for current phase.
+ # NB: result_history is updated here, neither in process() nor in draw(),
+ # unlike order_history, message_history and state_history.
+ self.result_history.put(self._phase_wrapper_type(self.current_short_phase), self.result)
+ self.result = {}
+
+ # For each possible phase
+ for _ in self.map.seq:
+
+ # If game is not yet started, or completed can't advance
+ if self.phase in (None, 'FORMING', 'COMPLETED'):
+ break
+
+ # Finding next phase and setting variables
+ self.phase = self._find_next_phase()
+ self.phase_type = self.phase.split()[-1][0]
+
+ # Check phase determines if we need to process phase (0) or can skip it (1)
+ if not self._check_phase():
+ break
+ else:
+ raise Exception("FailedToAdvancePhase")
+
+ # Rebuilding the convoy cache
+ self._build_list_possible_convoys()
+
+ # Returning
+ return []
+
+ def _move_to_start_phase(self):
+ """ Moves to the map's start phase
+ :return: Nothing, but sets the self.phase and self.phase_type settings
+ """
+ # Retrieve the beginning phase and phase type from the map
+ self.phase = self.map.phase
+ self.phase_type = self.phase.split()[-1][0]
+
+ def _find_next_phase(self, phase_type=None, skip=0):
+ """ Returns the long name of the phase coming immediately after the current phase
+ :param phase_type: The type of phase we are looking for
+ (e.g. 'M' for Movement, 'R' for Retreats, 'A' for Adjust.)
+ :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after)
+ :return: The long name of the next phase (e.g. FALL 1905 MOVEMENT)
+ """
+ return self.map.find_next_phase(self.phase, phase_type, skip)
+
+ def _find_previous_phase(self, phase_type=None, skip=0):
+ """ Returns the long name of the phase coming immediately before the current phase
+ :param phase_type: The type of phase we are looking for
+ (e.g. 'M' for Movement, 'R' for Retreats, 'A' for Adjust.)
+ :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after)
+ :return: The long name of the previous phase (e.g. SPRING 1905 MOVEMENT)
+ """
+ return self.map.find_previous_phase(self.phase, phase_type, skip)
+
+ def _get_start_phase(self):
+ """ Returns the name of the start phase"""
+ cur_phase, cur_phase_type = self.phase, self.phase_type
+ self._move_to_start_phase()
+ phase = self.phase
+ self.phase, self.phase_type = cur_phase, cur_phase_type
+ return phase
+
+ def _check_phase(self):
+ """ Checks if we need to process a phase, or if we can skip it if there are no actions
+ :return: Boolean (0 or 1) - 0 if we need to process phase, 1 if we can skip it
+ """
+ # pylint: disable=too-many-return-statements
+ # Beginning / End of game - Can't skip
+ if self.phase in (None, 'FORMING', 'COMPLETED'):
+ return 0
+
+ # When changing phases, clearing all caches
+ self.clear_cache()
+
+ # Movement phase - Always need to process
+ if self.phase_type == 'M':
+ return 0
+
+ # Retreats phase
+ if self.phase_type == 'R':
+ # We need to process if there are retreats
+ if [1 for x in self.powers.values() if x.retreats]:
+ return 0
+
+ # Otherwise, clearing flags and skipping phase
+ for power in self.powers.values():
+ for dis_unit in power.retreats:
+ self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ power.retreats, power.adjust, power.civil_disorder = {}, [], 0
+ self.result = {}
+ if 'DONT_SKIP_PHASES' in self.rules:
+ return 0
+ return 1
+
+ # Adjustments phase
+ if self.phase_type == 'A':
+ # Capturing supply centers
+ self._capture_centers()
+
+ # If completed, can't skip
+ if self.phase == 'COMPLETED':
+ return 0
+
+ # If we have units to remove or to build, we need to process
+ for power in self.powers.values():
+ units, centers = len(power.units), len(power.centers)
+ if [x for x in power.centers if x in power.homes]:
+ centers += (0 + min(0, len([0 for x in power.units if x[2:5] in power.homes])))
+ if units > centers or (units < centers and self._build_limit(power)):
+ return 0
+
+ # Otherwise, skipping
+ self.result = {}
+ if 'DONT_SKIP_PHASES' in self.rules:
+ return 0
+ return 1
+
+ # Other phases. We need to process manually.
+ return 0
+
+ def _post_move_update(self):
+ """ Deletes orders and removes CD flag after moves """
+ for power in self.powers.values():
+ power.orders, power.civil_disorder = {}, 0
+
+ def _build_sites(self, power):
+ """ Returns a list of sites where power can build units
+ :param power: The power instance to check
+ :return: A list of build sites
+ """
+
+ # Retrieving the list of homes (build sites) for the power, and the list of active powers
+ homes = power.homes
+
+ # Can build on any of his centers
+ # -- BUILD_ANY: Powers may build new units at any owned supply center, not simply at their home supply centers.
+ if 'BUILD_ANY' in self.rules:
+ homes = power.centers
+
+ # Updating homes to only include homes if they are unoccupied,
+ homes = [h for h in homes if h in power.centers and h not in
+ [u[2:5] for p in self.powers.values() for u in p.units]]
+ return homes
+
+ def _build_limit(self, power, sites=None):
+ """ Determines the maximum number of builds a power can do in an adjustment phase
+ Note: This function assumes that one unit can be built per build sites + alternative sites
+ The actual maximum build limit would be less if units are built on alternative sites.
+ :param power: The power instance to check
+ :param sites: The power's build sites (or None to compute them)
+ :return: An integer representing the maximum number of simultaneous builds
+ """
+ # Computing build_sites if not provided
+ if sites is None:
+ sites = self._build_sites(power)
+
+ # Returning number of sites
+ return len(sites)
+
+ def _calculate_victory_score(self):
+ """ Calculates the score to determine win for each power
+ :return: A dict containing the score for each power (e.g. {'FRANCE': 10, 'ENGLAND': 2})
+ """
+ score = {}
+
+ # Score is the number of supply centers owned
+ for power in self.powers.values():
+ score[power] = len([sc for sc in power.centers])
+ return score
+
+ def _determine_win(self, last_year):
+ """ Determine if we have a win.
+ :param last_year: A dict containing the score for each power (e.g. {'FRANCE': 10, 'ENGLAND': 2})
+ (from the previous year)
+ :return: Nothing
+ """
+ victors, this_year = [], self._calculate_victory_score()
+ year_centers = [this_year[x] for x in self.powers.values()]
+
+ # Determining win
+ for power in self.powers.values():
+ centers = this_year[power]
+
+ # 1) you must have enough centers to win
+ if (centers >= self.win
+
+ # 2) and you must grow or, if "HOLD_WIN", must have had a win
+ and (centers > last_year[power], last_year[power] >= self.win)['HOLD_WIN' in self.rules]
+
+ # 3) and you must be alone in the lead (not required in case of SHARED_VICTORY)
+ and ('SHARED_VICTORY' in self.rules
+ or (centers, year_centers.count(centers)) == (max(year_centers), 1))):
+ victors += [power]
+
+ # We have a winner!
+ if victors:
+ self._finish([victor.name for victor in victors])
+
+ # DRAW if 100 years
+ elif int(self.phase.split()[1]) - self.map.first_year + 1 == 100:
+ self.draw()
+
+ def _capture_centers(self):
+ """ In Adjustment Phase, proceed with the capture of occupied supply centers
+ :return: Nothing
+ """
+ victory_score_prev_year = self._calculate_victory_score()
+
+ # If no power owns centers, initialize them
+ if not [1 for x in self.powers.values() if x.centers]:
+ for power in self.powers.values():
+ for center in power.centers:
+ self.update_hash(power.name, loc=center, is_center=True)
+ power.centers = []
+ for center in power.homes:
+ self.update_hash(power.name, loc=center, is_center=True)
+ power.centers.append(center)
+
+ # Remember the current center count for the various powers, for use in victory condition check,
+ # then go through and see if any centers have been taken over
+ unowned = self.map.scs[:]
+ for power in self.powers.values():
+ for center in power.centers:
+ if center in unowned:
+ unowned.remove(center)
+
+ # Keep track of scs lost
+ self.lost = {}
+ for power in list(self.powers.values()) + [None]:
+ # Centers before takover
+ if power:
+ centers = power.centers
+ else:
+ centers = unowned
+
+ # For each center, check if we took ownership
+ for center in centers[:]:
+ for owner in self.powers.values():
+
+ # 1) If center is unowned, or 2) owned by someone else and that we have a unit on it
+ # Proceed with transfer, and record lost
+ if (not power or owner is not power) and center in [x[2:5] for x in owner.units]:
+ self._transfer_center(power, owner, center)
+ if not power:
+ unowned.remove(center)
+ else:
+ self.lost[center] = power
+ break
+
+ # Determining if we have a winner
+ self._determine_win(victory_score_prev_year)
+
+ def _transfer_center(self, from_power, to_power, center):
+ """ Transfers a supply center from a power to another
+ :param from_power: The power instance from whom the supply center is transfered
+ :param to_power: The power instance to whom the supply center is transferred
+ :param center: The supply center location (e.g. 'PAR')
+ :return: Nothing
+ """
+ if from_power:
+ self.update_hash(from_power.name, loc=center, is_center=True)
+ from_power.centers.remove(center)
+ if center not in to_power.centers:
+ self.update_hash(to_power.name, loc=center, is_center=True)
+ to_power.centers += [center]
+
+ def _finish(self, victors):
+ """ Indicates that a game is finished and has been won by 'victors'
+ :param victors: The list of victors (e.g. ['FRANCE', 'GERMANY'])
+ :return: Nothing
+ """
+ # Setting outcome, and end date. Clearing orders and saving.
+ self.outcome = [self._phase_abbr()] + victors
+ self.note = 'Victory by: ' + ', '.join([vic[:3] for vic in victors])
+ self.phase = 'COMPLETED'
+ self.set_status(strings.COMPLETED)
+ for power in self.powers.values():
+ for dis_unit in power.retreats:
+ self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ power.retreats, power.adjust, power.civil_disorder = {}, [], 0
+
+ def _phase_abbr(self, phase=None):
+ """ Constructs a 5 character representation (S1901M) from a phase (SPRING 1901 MOVEMENT)
+ :param phase: The full phase (e.g. SPRING 1901 MOVEMENT)
+ :return: A 5 character representation of the phase
+ """
+ return self.map.phase_abbr(phase or self.phase)
+
+ # ====================================================================
+ # Private Interface - ORDER Submission methods
+ # ====================================================================
+ def _add_order(self, power, word, expand=True, replace=True):
+ """ Adds an order for a power
+ :param power: The power instance issuing the order
+ :param word: The order (e.g. ['A', 'PAR', '-', 'MAR'])
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing, but adds error to self.error
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ if not word:
+ return None
+
+ raw_word = word
+
+ if expand:
+ # Check that the order is valid. If not, self.error will say why.
+ word = self._expand_order(word)
+ word = self._expand_coast(word)
+ word = self._add_unit_types(word)
+ word = self.map.default_coast(word)
+
+ # Last word is '?' - Removing it
+ if word and len(word[-1]) == 1 and not word[-1].isalpha():
+ word = word[:-1]
+ if len(word) < 2:
+ return self.error.append(err.STD_GAME_BAD_ORDER % ' '.join(word))
+
+ # Checking if we can order unit
+ unit, order = ' '.join(word[:2]), ' '.join(word[2:])
+ owner = self._unit_owner(unit)
+ if not owner or owner is not power:
+ self.error += [err.STD_GAME_UNORDERABLE_UNIT % ' '.join(word)]
+
+ # Validating order
+ elif order:
+ valid = self._valid_order(power, unit, order)
+
+ # Valid order. But is it to a unit already ordered? This is okay in a NO_CHECK game, and
+ # we HOLD the unit. If not, pack it back into the power's order list.
+ if valid is not None:
+ power.civil_disorder = 0
+ if valid == -1:
+ order += ' ?'
+ if unit not in power.orders or (replace and 'NO_CHECK' not in self.rules):
+ power.orders[unit] = order
+ elif 'NO_CHECK' in self.rules:
+ count = len(power.orders)
+ if power.orders[unit] not in ('H', order):
+ power.orders['REORDER %d' % count] = power.orders[unit]
+ count += 1
+ power.orders[unit] = 'H'
+ power.orders['REORDER %d' % count] = ' '.join(word)
+ else:
+ self.error += [err.STD_GAME_UNIT_REORDERED % unit]
+
+ # Invalid order in NO_CHECK game
+ elif 'NO_CHECK' in self.rules:
+ count = len(power.orders)
+ power.orders['INVALID %d' % count] = ' '.join(raw_word)
+
+ # Returning nothing
+ return None
+
+ def _update_orders(self, power, orders, expand=True, replace=True):
+ """ Updates the orders of a power
+ :param power: The power instance (or None if updating multiple instances)
+ :param orders: The updated list of orders
+ e.g. ['A MAR - PAR', 'A PAR - BER', ...]
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ cur_power, had_orders, has_orders, powers = power, [], [], []
+
+ # For each order
+ for line in orders:
+
+ word = line.strip().split()
+ who = cur_power
+
+ if not word:
+ continue
+
+ # Checking if the power can order
+ if not hasattr(who, 'orders'):
+ return self.error.append('%s HAS NO UNITS OF ITS OWN TO ORDER' % who.name)
+
+ # NMR = No Moves Received (NMR or CLEAR command)
+ nmr = (len(word) == 1
+ and word[0][word[0][:1] in '([':len(word[0]) - (word[0][-1:] in '])')].upper() in ('NMR', 'CLEAR'))
+ if who not in powers:
+
+ # Empty orders before sticking any new orders in it.
+ had_orders += [who.orders]
+ powers += [who]
+ if nmr:
+ continue
+
+ # If CLEAR or NMR, clear orders
+ elif nmr:
+ who.orders = {}
+ has_orders = [x for x in has_orders if x is not who]
+ continue
+
+ # Adds orders
+ if 'NO_CHECK' in self.rules:
+ data = self._expand_order(word)
+ if len(data) < 3 and (len(data) == 1 or data[1] != 'H'):
+ self.error.append(err.STD_GAME_BAD_ORDER % line.upper())
+ continue
+
+ # Voiding previous order on same unit
+ if replace:
+ for order in who.orders:
+ order_parts = who.orders[order].split()
+ if len(order_parts) >= 2 and order_parts[1][:3] == word[1][:3]:
+ who.orders[order] = ''
+
+ # Adding new order
+ who.orders['ORDER %d' % (len(who.orders) + 1)] = ' '.join(word)
+ else:
+ self._add_order(who, word, expand=expand, replace=replace)
+ if who.orders and who not in has_orders:
+ has_orders += [who]
+
+ # Make sure the player can update his orders
+ if not powers:
+ return 1
+ if self.error:
+ return self.error
+
+ # Clear CD flag, even if orders were cleared
+ for who in powers:
+ who.civil_disorder = 0
+
+ # Returning nothing
+ return None
+
+ def _add_retreat_orders(self, power, orders, expand=True, replace=True):
+ """ Adds a retreat order (Retreats Phase)
+ :param power: The power instance who is submitting orders (or None if power is in the orders)
+ :param orders: The list of adjustment orders
+ (format can be [Country: order], [Country, order, order], or [order,order])
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing, but adds error to self.error
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ # No orders, returning
+ if not orders:
+ power.adjust, power.civil_disorder = [], 0
+ return
+
+ # Processing each order
+ adjust, retreated = [], []
+ for order in orders:
+ word = order.split()
+ if not word or len(word) < 2:
+ continue
+
+ # Expanding and adding unit types
+ if expand:
+ word = self._expand_order([order])
+ word = self._add_unit_types(word)
+
+ # Add 'R' has order type for Retreat, 'D' for Disband
+ if word[0] == 'R' and len(word) > 3:
+ del word[0]
+ if word[0] in 'RD':
+ word = word[1:] + word[:1]
+
+ # Checking if unit can retreat
+ unit = ' '.join(word[:2])
+ try:
+ unit = [r_unit for r_unit in power.retreats if r_unit == unit or r_unit.startswith(unit + '/')][0]
+ except IndexError:
+ adjust += ['VOID ' + order]
+ self.error.append(err.GAME_UNIT_NOT_IN_RETREAT % unit)
+ continue
+
+ # Checking if unit already retreated
+ if unit in retreated:
+ adjust += ['VOID ' + order]
+ self.error.append(err.GAME_TWO_ORDERS_FOR_RETREATING_UNIT % unit)
+ continue
+ word[1] = unit[2:]
+
+ # Adding Disband for retreats with no destination
+ if len(word) == 3 and word[2] in 'RD':
+ word[2] = 'D'
+
+ # Checking if retreat destination is valid
+ elif len(word) == 4 and word[2] in 'R-':
+ word[2] = 'R'
+ if word[3] not in power.retreats[unit]\
+ or self._unit_owner('A {}'.format(word[3][:3]), coast_required=0) \
+ or self._unit_owner('F {}'.format(word[3][:3]), coast_required=0):
+ self.error.append(err.GAME_INVALID_RETREAT_DEST % ' '.join(word))
+ adjust += ['VOID ' + order]
+ continue
+
+ # Invalid retreat order - Voiding
+ else:
+ self.error.append(err.GAME_BAD_RETREAT_ORDER % ' '.join(word))
+ adjust += ['VOID ' + order]
+ continue
+
+ # Adding retreat order and marking unit as retreated
+ retreated += [unit]
+ adjust += [' '.join(word)]
+
+ # Replacing previous orders
+ if replace:
+ for order in adjust:
+ word = order.split()
+ if len(word) >= 2 and word[0] != 'VOID':
+ power.adjust = [adj_order for adj_order in power.adjust if adj_order.split()[1:2] != word[1:2]]
+
+ # Otherwise, marking re-orders as invalid
+ else:
+ ordered_locs = [adj_order.split()[1] for adj_order in power.adjust]
+ for order in adjust[:]:
+ word = order.split()
+ if len(word) >= 2 and word[1] in ordered_locs:
+ self.error += [err.GAME_MULTIPLE_ORDERS_FOR_UNIT % ' '.join(word[:2])]
+ adjust.remove(order)
+
+ # Finalizing orders
+ power.adjust += adjust
+ power.civil_disorder = 0
+
+ def _update_retreat_orders(self, power, orders, expand=True, replace=True):
+ """ Updates order for Retreats phase
+ :param power: The power instance submitting the orders
+ :param orders: The updated orders
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: List of processing errors
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ for who, adj in self._distribute_orders(power, orders):
+ self._add_retreat_orders(who, adj, expand=expand, replace=replace)
+ return self.error
+
+ def _add_adjust_orders(self, power, orders, expand=True, replace=True):
+ """ Adds an adjustment order (Adjustment Phase)
+ :param power: The power instance who is submitting orders (or None if power is in the orders)
+ :param orders: The list of adjustment orders (format can be [Country: order],
+ [Country, order, order], or [order,order])
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing, but adds error to self.error
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ # pylint: disable=too-many-branches
+ # No orders submitted, returning
+ if not orders:
+ power.adjust, power.civil_disorder = [], 0
+ return
+
+ # Calculating if the power can build or remove units
+ adjust, places = [], []
+ need, sites = len(power.centers) - len(power.units), []
+ order_type = 'D' if need < 0 else 'B'
+
+ # If we can build, calculating list of possible build locations
+ if need > 0:
+ sites = self._build_sites(power)
+ need = min(need, self._build_limit(power, sites))
+
+ # Processing each order
+ for order in orders:
+ order = order.strip()
+
+ if order == 'WAIVE':
+ # Check WAIVE order immediately and continue to next loop step.
+ if need >= 0:
+ adjust += [order]
+ else:
+ adjust += ['VOID ' + order]
+ self.error += ['WAIVE NOT ALLOWED FOR DISBAND']
+ continue
+
+ if not order or len(order.split()) < 2:
+ continue
+ word = self._expand_order([order]) if expand else order.split()
+
+ # Checking if unit can Build/Disband, otherwise voiding order
+ if word[-1] == order_type:
+ pass
+ elif word[-1] in 'BD':
+ adjust += ['VOID ' + order]
+ self.error += ['ORDER NOT ALLOWED: ' + order]
+ continue
+
+ # Adding unit type
+ if word[-1] == 'D' and expand:
+ word = self._add_unit_types(word)
+
+ # Checking for 'Disband'
+ order = ' '.join(word)
+ if word[-1] == 'D':
+ if len(word) == 3:
+ unit = ' '.join(word[:2])
+
+ # Invalid unit, voiding order
+ if unit not in power.units:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_NO_SUCH_UNIT % unit]
+
+ # Order to remove unit
+ elif order not in adjust:
+ adjust += [order]
+
+ # Invalid order, voiding
+ else:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_MULTIPLE_ORDERS_FOR_UNIT % unit]
+ else:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_BAD_ADJUSTMENT_ORDER % order]
+
+ # Checking for BUILD
+ elif len(word) == 3:
+ site = word[1][:3]
+
+ # Invalid build site
+ if site not in sites:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_INVALID_BUILD_SITE % order]
+
+ # Site already used
+ elif site in places:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_MULT_BUILDS_IN_SITE % order]
+
+ # Unit can't be built there
+ elif not self.map.is_valid_unit(' '.join(word[:2])):
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_INVALID_BUILD_ORDER % order]
+
+ # Valid build sites
+ else:
+ adjust += [order]
+ places += [site]
+
+ # Otherwise, unknown order - Voiding
+ else:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_BAD_ADJUSTMENT_ORDER % order]
+
+ # NB: We skip WAIVE orders when checking for replacements.
+ # We will check them later.
+
+ # Replacing previous orders
+ if replace:
+ for order in adjust:
+ word = order.split()
+ if len(word) >= 2 and word[0] != 'VOID':
+ power.adjust = [adj_order for adj_order in power.adjust
+ if adj_order == 'WAIVE' or adj_order.split()[1] != word[1]]
+
+ # Otherwise, marking re-orders as invalid
+ else:
+ ordered_locs = [adj_order.split()[1] for adj_order in power.adjust if adj_order != 'WAIVE']
+ for order in adjust[:]:
+ word = order.split()
+ if len(word) >= 2 and word[1] in ordered_locs:
+ self.error += [err.GAME_MULTIPLE_ORDERS_FOR_UNIT % ' '.join(word[:2])]
+ adjust.remove(order)
+
+ # Finalizing orders
+ power.adjust += adjust
+ power.civil_disorder = 0
+
+ # We check WAIVE orders in power.adjust after updating power.adjust,
+ # as WAIVE orders depend on variable `need`, whom computation is relative to power
+ # (ie. not relative to orders being currently adjusted).
+
+ # Removing extra waive orders
+ while 0 < need < len(power.adjust):
+ if 'WAIVE' in power.adjust:
+ power.adjust.remove('WAIVE')
+ else:
+ break
+
+ # Adding missing waive orders
+ if 'WAIVE' in power.adjust or power.is_dummy():
+ power.adjust.extend(['WAIVE'] * (need - len(power.adjust)))
+
+ def _update_adjust_orders(self, power, orders, expand=True, replace=True):
+ """ Updates order for Adjustment phase
+ :param power: The power instance submitting the orders
+ :param orders: The updated orders
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: List of processing errors
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ for who, adj in self._distribute_orders(power, orders):
+ self._add_adjust_orders(who, adj, expand=expand, replace=replace)
+ return self.error
+
+ def _determine_orders(self):
+ """ Builds the self.orders dictionary (i.e. makes sure all orders are legitimate). """
+ self.orders = {}
+
+ # Determine the orders to be issued to each unit, based on unit ownership
+ for power in self.powers.values():
+ for unit, order in power.orders.items():
+ if power is self._unit_owner(unit):
+ self.orders[unit] = order
+
+ # In NO_CHECK games, ensure that orders to other player's units are reported as invalid
+ # if no proxy was given
+ if 'NO_CHECK' in self.rules:
+ for power in self.powers.values():
+ for unit, order in power.orders.items():
+ if unit[0] not in 'RI' and power is not self._unit_owner(unit):
+ order = unit + ' ' + order
+ power.orders['INVALID %d' % len(power.orders)] = order
+
+ def _default_orders(self, power):
+ """ Issues default orders for a power (HOLD)
+ :param power: The power instance
+ :return: Nothing
+ """
+ # Power has no units
+ if not power.units:
+ return
+
+ # Power has not submitted all his orders, checking if we default to HOLD
+ if not [x for x in power.units if self.orders.get(x)]:
+ power.civil_disorder = 1
+ for unit in power.units:
+ self.orders.setdefault(unit, 'H')
+
+ @classmethod
+ def _distribute_orders(cls, power, orders, clear=True):
+ """ For controlling powers, distribute orders to controlled powers
+ :param power: The power instance submitting the orders
+ :param orders: The list of orders submitted
+ :param clear: Boolean flag to indicate to clear order if NMR/CLEAR submitted
+ :return: A list of tuple with (controlled power instance, list of orders for that controlled power instance)
+ """
+ powers, distributor = [power], {power.name: []}
+ cur_power = power
+
+ # For each order submitted
+ for order in orders:
+ # Don't distribute blank order
+ word = order.strip().split()
+ if not word:
+ continue
+
+ who = cur_power
+
+ # Clearing orders for power
+ if (clear
+ and len(word) == 1
+ and word[0][word[0][:1] in '([':len(word[0]) - (word[0][-1:] in '])')].upper() in ('NMR', 'CLEAR')):
+ distributor[who.name] = []
+ # Otherwise, distributing order
+ else:
+ distributor[who.name] += [' '.join(word)]
+
+ # Returning a list of tuples with the power instance and their respective orders.
+ return [(x, distributor[x.name]) for x in powers]
+
+ # ====================================================================
+ # Private Interface - ADJUDICATION Methods
+ # ====================================================================
+ def _abuts(self, unit_type, unit_loc, order_type, other_loc):
+ """ Determines if a order for unit_type from unit_loc to other_loc is adjacent (Support and convoy only)
+ :param unit_type: The type of unit ('A' or 'F')
+ :param unit_loc: The location of the unit ('BUR', 'BUL/EC')
+ :param order_type: The type of order ('S' for Support, 'C' for Convoy', '-' for move)
+ :param other_loc: The location of the other unit
+ :return: 1 if the locations are adjacent for the move, 0 otherwise
+ """
+ # Check if the map says the adjacency is good
+ if not self.map.abuts(unit_type, unit_loc, order_type, other_loc):
+ return 0
+ return 1
+
+ def _unit_owner(self, unit, coast_required=1):
+ """ Finds the power who owns a unit
+ :param unit: The name of the unit to find (e.g. 'A PAR')
+ :param coast_required: Indicates that the coast is in the unit
+ (if 0, you can search for 'F STP' for example, but if 1, you must specify 'F STP/SC')
+ :return: The power instance who owns the unit or None
+ """
+ # If coast_required is 0 and unit does not contain a '/'
+ # return the owner if we find a unit that starts with unit
+ # Don't count the unit if it needs to retreat (i.e. it has been dislodged)
+ for owner in self.powers.values():
+ if unit in owner.units:
+ return owner
+ if not coast_required and '/' not in unit and [1 for x in owner.units if x.find(unit) == 0]:
+ return owner
+ return None
+
+ def _occupant(self, site, any_coast=0):
+ """ Finds the occupant of a site
+ :param site: The site name (e.g. "STP")
+ :param any_coast: Boolean to indicate to return unit on any coast
+ :return: The unit (e.g. "A STP", "F STP/NC") occupying the site, None otherwise
+ """
+ if any_coast:
+ site = site[:3]
+ for power in self.powers.values():
+ for unit in power.units:
+ if unit[2:].startswith(site):
+ return unit
+ return None
+
+ def _strengths(self):
+ """ This function sets self.combat to a dictionary of dictionaries, specifying each potential destination
+ for every piece, with the strengths of each unit's attempt to get (or stay) there, and with the givers
+ of supports that DON'T country dislodgement. (i.e. supports given by the power owning the occupying unit).
+ :return: Nothing, but sets self.combat
+ """
+ # For example, the following orders, all by the same power:
+ # A MUN H, A SIL - MUN, A BOH S A SIL - MUN, A RUH - MUN would result in:
+ # e.g. { 'MUN': { 1 : [ ['A MUN', [] ], ['A RUH', [] ] ], 2 : [ ['A SIL', ['A BOH'] ] ] } }
+ # MUN is holding, being attack without support from RUH and being attacked with support from SIL (S from BOH)
+ self.combat = {}
+
+ # For each order
+ for unit, order in self.command.items():
+ word = order.split()
+
+ # Strength of a non-move or failed move is 1 + support
+ if word[0] != '-' or self.result[unit]:
+ place, strength = unit[2:5], 1
+
+ # Strength of move depends on * and ~ in adjacency list
+ else:
+ offset = 1 if word[-1] == 'VIA' else 0
+ place = word[-1 - offset][:3]
+ strength = 1
+
+ # Adds the list of supporting units
+ # Only adding the support that DOES NOT count toward dislodgment
+ self.combat \
+ .setdefault(place, {}) \
+ .setdefault(strength + self.supports[unit][0], []) \
+ .append([unit, self.supports[unit][1]])
+
+ def _detect_paradox(self, starting_node, paradox_action, paradox_last_words):
+ """ Paradox detection algorithm. Start at starting node and move chain to see if node if performing
+ paradox action
+ :param starting_node: The location (e.g. PAR) where to start the paradox chain
+ :param paradox_action: The action that would cause a paradox in the chain (e.g. 'S')
+ :param paradox_last_words: The last words to detect in a order to cause a paradox (e.g. ['F', 'NTH'])
+ :return: Boolean (1 or 0) to indicate if a paradox action was detected in the chain
+ """
+ visited_units = []
+ current_node = starting_node
+ current_unit = self._occupant(current_node)
+ while current_unit is not None and current_unit not in visited_units:
+ visited_units += [current_unit]
+ current_order = self.command.get(current_unit, 'H')
+
+ # Action and last words detected
+ if (current_order[0] == paradox_action
+ and current_order.split()[-1 * len(paradox_last_words):] == paradox_last_words):
+ return True
+
+ # Continuing chain only if order is Support or Convoy
+ if current_order.split()[0] not in 'SC':
+ break
+ current_node = current_order.split()[-1]
+ current_unit = self._occupant(current_node)
+
+ # No paradox detected
+ return False
+
+ def _check_disruptions(self, may_convoy, result, coresult=None):
+ """ Determines convoy disruptions.
+ :param may_convoy: Contains the dictionary of all convoys that have a chance to succeed
+ (e.g. {'A PAR': ['BER', 'MUN']}
+ :param result: Result to set for the unit if the convoying fleet would be dislodged
+ (e.g. 'maybe', 'no convoy')
+ :param coresult: Result to set for the convoyer if the convoying fleet would be dislodged (e.g. 'dislodged')
+ :return: Nothing
+ """
+ for unit, word in may_convoy.items():
+
+ # Removing '-'
+ word = [w for w in word if w != '-']
+
+ # Checking order of unit at dest
+ offset = 1 if self.command.get(unit, []).split()[-1] == 'VIA' else 0
+ convoy_dest = self.command.get(unit, 'H').split()[-1 - offset]
+ unit_at_dest = self._occupant(convoy_dest)
+ order_unit_at_dest = self.command.get(unit_at_dest, 'H')
+
+ # Looping over all areas where convoys will take place (including destination)
+ for place in word:
+ area, convoyer = place[:3], 'AF'[unit[0] == 'A'] + ' ' + place
+ strongest = self.combat[area][max(self.combat[area])]
+
+ # Checking if the convoy is under attack
+ for strong_unit in strongest:
+ if self._unit_owner(convoyer) != self._unit_owner(strong_unit[0]):
+ break
+ else:
+ continue
+
+ # Paradox Detection #1
+ # [1st and 2nd order] Checking that we are not attacking a chain, with the last unit supporting
+ # the convoy
+ paradox = self._detect_paradox(convoy_dest, 'S', ['S', 'F', area])
+
+ # Checking if the convoy can withstand the attack and there is not active paradox
+ if convoyer in [x[0] for x in strongest] and not paradox:
+ continue
+
+ # For a beleaguered garrison, checking if the destination is attacking / supporting an attack
+ # against convoy
+ if len(strongest) >= 2 and not paradox:
+ if order_unit_at_dest.split()[0] not in '-S' or order_unit_at_dest.split()[-1][:3] != area:
+ continue
+
+ # Removing paths using place
+ self.convoy_paths.setdefault(unit, [])
+ for path in self.convoy_paths[unit]:
+ if place in path:
+ self.convoy_paths[unit].remove(path)
+
+ # Paradox Detection #2 - Can convoyed unit use land route to cut support necessary to attack convoy
+ paradox = False
+ if self._abuts(unit[0], unit[2:], '-', convoy_dest):
+ paradox = self._detect_paradox(convoy_dest, 'S', ['-', area])
+
+ # Setting the result if there is no convoy paths left, and
+ # 1) there is no land route (or there is a paradox through the land route)
+ # or 2) the unit specified 'VIA' and doesn't want to try the land route (4.A.3)
+ if not self.convoy_paths[unit] and (paradox
+ or not self._abuts(unit[0], unit[2:], '-', convoy_dest)
+ or (self._abuts(unit[0], unit[2:], '-', convoy_dest)
+ and self.command[unit].split()[-1] == 'VIA')):
+ self.result[unit] = [result]
+
+ # Setting the result for a would-be dislodged fleet
+ if coresult:
+ self.result[convoyer] = [coresult]
+
+ def _boing(self, unit):
+ """ Mark a unit bounced, and update the combat table to show the unit as
+ having strength one at its current location
+ :param unit: The unit to bounce (e.g. 'A PAR')
+ :return: 1
+ """
+ self.result[unit] += ['bounce']
+ self.combat \
+ .setdefault(unit[2:5], {}) \
+ .setdefault(1, []) \
+ .append([unit, []])
+ return 1
+
+ def _bounce(self):
+ """ This methods marks all units that can't get where they're going as bounced.
+ It loops to handle bounce-chains.
+ """
+ # pylint: disable=too-many-nested-blocks
+ bounced = 1
+ while bounced:
+ bounced = 0
+
+ # STEP 6. MARK (non-convoyed) PLACE-SWAP BOUNCERS
+ for unit, order in self.command.items():
+ word = order.split()
+ if self.result[unit] or word[0] != '-' or self._is_moving_via_convoy(unit):
+ continue
+ crawl_ok, site = False, '- ' + unit[2:]
+ swap = self._occupant(word[1], any_coast=not crawl_ok)
+ if self._is_moving_via_convoy(swap):
+ continue
+ if not (crawl_ok and swap and swap[0] == unit[0] == 'F'):
+ site = site.split('/')[0]
+ if not (self.command.get(swap, '').find(site) or self.result[swap]):
+ my_strength = self.supports[unit][0] - len(self.supports[unit][1])
+ his_strength = self.supports[swap][0] - len(self.supports[swap][1])
+ our_strength = (self._unit_owner(unit) is self._unit_owner(swap)
+ or self.supports[unit][0] == self.supports[swap][0])
+ if our_strength or my_strength <= his_strength:
+ self._boing(unit)
+ if our_strength or his_strength <= my_strength:
+ self._boing(swap)
+
+ # Marking support used for self-dislodgement as void
+ for supporting_unit in self.supports[unit][1]:
+ self.result[supporting_unit] += ['void']
+ for supporting_unit in self.supports[swap][1]:
+ self.result[supporting_unit] += ['void']
+ bounced = 1
+ if bounced:
+ continue
+ # No (more) swap-bouncers
+
+ # STEP 7. MARK OUTGUNNED BOUNCERS
+ for place, conflicts in list(self.combat.items()):
+ strength = sorted(conflicts.keys())
+ for key in strength:
+ if key != strength[-1] or len(conflicts[key]) != 1:
+ for unit, no_help in conflicts[key]:
+ if not self.result[unit] and self.command[unit][0] == '-':
+ bounced = self._boing(unit)
+ if bounced:
+ continue
+ # No (more) outgunned bouncers
+
+ # STEP 8. MARK SELF-DISLODGE BOUNCERS
+ for place, conflicts in list(self.combat.items()):
+ strength = sorted(conflicts.keys())
+ if len(conflicts[strength[-1]]) != 1:
+ continue
+ strongest = conflicts[strength[-1]][0][0]
+ if self.command[strongest][0] != '-' or self.result[strongest]:
+ continue
+ no_help = len(conflicts[strength[-1]][0][1])
+ guy = self._occupant(place)
+ if guy:
+ owner = self._unit_owner(guy)
+ if ((self.command[guy][0] != '-' or self.result[guy])
+ and (owner is self._unit_owner(strongest)
+ or (len(strength) > 1 and strength[-1] - no_help <= strength[-2]))):
+ bounced = self._boing(strongest)
+ for supporting_unit in conflicts[strength[-1]][0][1]:
+ if 'void' not in self.result[supporting_unit]:
+ self.result[supporting_unit] += ['void']
+
+ # No (more) self-dislodge bouncers
+
+ def _cut_support(self, unit, direct=0):
+ """ See if the order made by the unit cuts a support. If so, cut it.
+ :param unit: The unit who is attacking (and cutting support)
+ :param direct: Boolean Flag - If set, the order must not only be a move, but also a non-convoyed move.
+ :return: Nothing
+ """
+ order = self.command[unit]
+ word = order.split()
+ if word[0] != '-' or (direct and self._is_moving_via_convoy(unit)):
+ return
+ dest = word[-1] if word[-1] != 'VIA' else word[-2]
+ other_unit = self._occupant(dest, any_coast=1)
+ coord = self.command.get(other_unit, 'no unit at dest').split()
+ support_target = 'F ' + coord[-1][:3]
+
+ # pylint: disable=too-many-boolean-expressions
+ if (coord[0] == 'S'
+ and 'cut' not in self.result[other_unit]
+ and 'void' not in self.result[other_unit]
+
+ # EXCEPTION A: CANNOT CUT SUPPORT YOU YOURSELF ARE GIVING
+ and (self._unit_owner(unit) is not self._unit_owner(other_unit))
+
+ # EXCEPTION B: CANNOT CUT SUPPORT FOR A MOVE AGAINST YOUR LOCATION
+ and coord[-1][:3] != unit[2:5]
+
+ # EXCEPTION C: OR (IF CONVOYED) FOR OR AGAINST ANY CONVOYING FLEET
+ and (not self._is_moving_via_convoy(unit)
+ or self.command.get(support_target, 'H')[0] != 'C'
+ or 'void' in self.result.get(support_target, [])
+ # EXCEPTION TO EXCEPTION C: IF THERE IS A ALTERNATIVE CONVOY ROUTE
+ or [1 for path in self.convoy_paths[unit] if support_target[2:] not in path])):
+
+ # Okay, the support is cut.
+ self.result[other_unit] += ['cut']
+ affected = ' '.join(coord[1:3]) # Unit being supported
+ self.supports[affected][0] -= 1
+ if other_unit in self.supports[affected][1]:
+ self.supports[affected][1].remove(other_unit)
+
+ def _no_effect(self, unit, site):
+ """ Removes a unit from the combat list of an attack site
+ :param unit: The unit attacking the site (e.g. ['A PAR', []])
+ :param site: The site being attacked (e.g. 'MAR')
+ :return: Nothing
+ """
+ sups = [strength for strength, attack_unit in self.combat[site].items() if unit in attack_unit][0]
+ self.combat[site][sups].remove(unit)
+ if not self.combat[site][sups]:
+ del self.combat[site][sups]
+ if not self.combat[site]:
+ del self.combat[site]
+
+ def _unbounce(self, site):
+ """ Unbounce any powerful-enough move that can now take the spot being vacated by the dislodger.
+ :param site: The site being attacked
+ :return: Nothing
+ """
+ # Detecting if there is only one attack winning at site
+ most = max(self.combat[site])
+ if len(self.combat[site][most]) > 1:
+ return None
+
+ # Unbouncing the winner of the attack at site
+ unbouncer = self.combat[site][most][0][0]
+ if 'bounce' in self.result[unbouncer]:
+ self.result[unbouncer].remove('bounce')
+ if unbouncer in self.dislodged:
+ del self.dislodged[unbouncer]
+ return self.result[unbouncer].remove('dislodged')
+
+ next_site = unbouncer[2:5]
+ self._no_effect([unbouncer, []], next_site)
+ if next_site in self.combat:
+ self._unbounce(next_site)
+ return None
+
+ def _resolve_moves(self):
+ """ Resolves the list of orders """
+ # pylint: disable=too-many-statements,too-many-branches
+
+ # -----------------------------------------------------------
+ # STEP 0: DECLARE ALL RESULTS AS YET UNKNOWN
+ self.result, self.supports, self.convoy_paths, may_convoy = {}, {}, {}, {}
+
+ # Fill self.command from the self.orders dictionary
+ # Fill self.ordered_units from the powers.units list
+ # Default order is to hold
+ self.command = {}
+ self.ordered_units = {}
+ for power in self.powers.values():
+ self.ordered_units[power.name] = [unit for unit in power.units if unit in self.orders]
+ for unit in power.units:
+ self.command[unit] = self.orders.get(unit, 'H')
+ if 'NO_CHECK' in self.rules:
+ for order in [order for key, order in power.orders.items() if key.startswith('INVALID')]:
+ unit = ' '.join(order.split()[:2])
+ self.ordered_units[power.name] += [unit]
+ self.command[unit] = 'H'
+ self.result[unit] = ['void']
+ self._default_orders(power)
+
+ for unit in self.command:
+ self.result.setdefault(unit, [])
+ self.supports.setdefault(unit, [0, []])
+
+ # -----------------------------------------------------------
+ # STEP 1A. CANCEL ALL INVALID ORDERS GIVEN TO UNITS ATTEMPTING TO MOVE BY CONVOY
+ for unit, order in list(self.command.items()):
+ word = order.split()
+ if word[0] != '-':
+ continue
+
+ # Full convoy path has been specified (e.g. 'A PAR - MAR - NAO - MAO - LON')
+ offset = 1 if word[-1] == 'VIA' else 0
+ if len(word) - offset > 2:
+ for convoyer in range(1, len(word) - 1, 2):
+ convoy_order = self.command.get('AF'[unit[0] == 'A'] + ' ' + word[convoyer])
+ if convoy_order not in ['C %s - ' % x + word[-1] for x in (unit, unit[2:])]:
+ if convoy_order:
+ self.result[unit] += ['no convoy']
+ else:
+ self.command[unit] = 'H'
+ break
+ # List the valid convoys
+ else:
+ may_convoy[unit] = order.split()
+ self.convoy_paths[unit] = [[unit[2:]] + word[1::2]]
+
+ # Only src and dest provided
+ else:
+ def flatten(nested_list):
+ """ Flattens a sublist """
+ return [list_item for sublist in nested_list for list_item in sublist]
+
+ has_via_convoy_flag = 1 if word[-1] == 'VIA' else 0
+ convoying_units = self._get_convoying_units_for_path(unit[0], unit[2:], word[1])
+ possible_paths = self._get_convoy_paths(unit[0],
+ unit[2:],
+ word[1],
+ has_via_convoy_flag,
+ convoying_units)
+
+ # No convoy path - Removing VIA and checking if adjacent
+ if not possible_paths:
+ if has_via_convoy_flag:
+ self.command[unit] = ' '.join(word[:-1])
+ if not self._abuts(unit[0], unit[2:], 'S', word[1]):
+ self.result[unit] += ['no convoy']
+
+ # There is a convoy path, remembering the convoyers
+ else:
+ self.convoy_paths[unit] = possible_paths
+ may_convoy.setdefault(unit, [])
+ for convoyer in convoying_units:
+ if convoyer[2:] in flatten(possible_paths) and convoyer[2:] not in may_convoy[unit]:
+ may_convoy[unit] += [convoyer[2:]]
+
+ # Marking all convoys that are not in any path
+ invalid_convoys = convoying_units[:]
+ all_path_locs = list(set(flatten(possible_paths)))
+ for convoy in convoying_units:
+ if convoy[2:] in all_path_locs:
+ invalid_convoys.remove(convoy)
+ for convoy in invalid_convoys:
+ self.result[convoy] = ['no convoy']
+
+ # -----------------------------------------------------------
+ # STEP 1B. CANCEL ALL INVALID CONVOY ORDERS
+ for unit, order in self.command.items():
+ if order[0] != 'C':
+ continue
+ # word = ['C', 'PAR', 'MAR'] -> ['C', 'A', 'PAR', 'MAR']
+ word, mover_type = order.split(), 'AF'[unit[0] == 'A']
+ if word[1] != mover_type:
+ word[1:1] = [mover_type]
+ mover = '%s %s' % (mover_type, word[2])
+ if self._unit_owner(mover):
+ convoyer = may_convoy.get(mover, [])
+ offset = 1 if self.command.get(mover, '').split()[-1] == 'VIA' else 0
+ mover_dest = self.command.get(mover, '').split()[-1 - offset]
+ if unit[2:] not in convoyer or word[-1] != mover_dest:
+ self.result[unit] += ['void']
+ else:
+ self.command[unit] = 'H'
+
+ # -----------------------------------------------------------
+ # STEP 2. CANCEL INCONSISTENT SUPPORT ORDERS AND COUNT OTHERS
+ for unit, order in self.command.items():
+ if order[0] != 'S':
+ continue
+ word, signal = order.split(), 0
+
+ # Remove any trailing "H" from a support-in-place order.
+ if word[-1] == 'H':
+ del word[-1]
+ self.command[unit] = ' '.join(word)
+
+ # Stick the proper unit type (A or F) into the order;
+ # All supports will have it from here on
+ where = 1 + (word[1] in 'AF')
+ guy = self._occupant(word[where])
+
+ # See if there is a unit to receive the support
+ if not guy:
+ self.command[unit] = 'H'
+ if not signal:
+ self.result[unit] += ['void']
+ continue
+ word[1:where + 1] = guy.split()
+ self.command[unit] = ' '.join(word)
+
+ # See if the unit's order matches the supported order
+ if signal:
+ continue
+ coord = self.command[guy].split()
+
+ # 1) Void if support is for hold and guy is moving
+ # 2) Void if support is for move and guy isn't going where support is given
+ # 3) Void if support is give, but move over convoy failed
+ offset = 1 if coord[-1] == 'VIA' else 0
+ if ((len(word) < 5 and coord[0] == '-')
+ or (len(word) > 4 and (coord[0], coord[-1 - offset]) != ('-', word[4]))
+ or 'no convoy' in self.result[guy]):
+ self.result[unit] += ['void']
+ continue
+
+ # Okay, the support is valid
+ self.supports[guy][0] += 1
+
+ # If the unit is owned by the owner of the piece being attacked, add the unit to those
+ # whose supports are not counted toward dislodgment.
+ if coord[0] != '-':
+ continue
+ owner = self._unit_owner(unit)
+ other = self._unit_owner(self._occupant(coord[-1], any_coast=1))
+ if owner is other:
+ self.supports[guy][1] += [unit]
+
+ # -----------------------------------------------------------
+ # STEP 3. LET DIRECT (NON-CONVOYED) ATTACKS CUT SUPPORTS
+ for unit in self.command:
+ if not self.result[unit]:
+ self._cut_support(unit, direct=1)
+
+ # -----------------------------------------------------------
+ # STEPS 4 AND 5. DETERMINE CONVOY DISRUPTIONS
+ cut, cutters = 1, []
+ while cut:
+ cut = 0
+ self._strengths()
+
+ # STEP 4. CUT SUPPORTS MADE BY (non-maybe) CONVOYED ATTACKS
+ self._check_disruptions(may_convoy, 'maybe')
+ for unit in may_convoy:
+ if self.result[unit] or unit in cutters:
+ continue
+ self._cut_support(unit)
+ cutters += [unit]
+ cut = 1
+ if cut:
+ continue
+
+ # STEP 5. LOCATE NOW-DEFINITE CONVOY DISRUPTIONS, VOID SUPPORTS
+ # THESE CONVOYERS WERE GIVEN, AND ALLOW CONVOYING UNITS TO CUT SUPPORT
+ self._check_disruptions(may_convoy, 'no convoy', 'disrupted')
+ for unit in may_convoy:
+ if 'no convoy' in self.result[unit]:
+ for sup, help_unit in self.command.items():
+ if not (help_unit.find('S %s' % unit) or self.result[sup]):
+ self.result[sup] = ['no convoy']
+ if not (help_unit.find('C %s' % unit) or self.result[sup]):
+ self.result[sup] = ['no convoy']
+ self.supports[unit] = [0, []]
+ elif 'maybe' in self.result[unit] and unit not in cutters:
+ self.result[unit], cut = [], 1
+ self._cut_support(unit)
+ cutters += [unit]
+
+ # Recalculate strengths now that some are reduced by cuts
+ self._strengths()
+
+ # Mark bounces, then dislodges, and if any dislodges caused a cut
+ # loop over this whole kaboodle again
+ self.dislodged, cut = {}, 1
+ while cut: # pylint: disable=too-many-nested-blocks
+ # -----------------------------------------------------------
+ # STEPS 6-8. MARK BOUNCERS
+ self._bounce()
+
+ # STEP 9. MARK SUPPORTS CUT BY DISLODGES
+ cut = 0
+ for unit, order in self.command.items():
+ if order[0] != '-' or self.result[unit]:
+ continue
+ attack_order = order.split()
+ offset = 1 if attack_order[-1] == 'VIA' else 0
+ victim = self._occupant(attack_order[-1 - offset], any_coast=1)
+ if victim and self.command[victim][0] == 'S' and not self.result[victim]:
+ word = self.command[victim].split()
+ supported, sup_site = self._occupant(word[2]), word[-1][:3]
+
+ # This next line is the key. Convoyed attacks can dislodge, but even when doing so, they cannot cut
+ # supports offered for or against a convoying fleet
+ # (They can cut supports directed against the original position of the army, though.)
+ if len(attack_order) > 2 and sup_site != unit[2:5]:
+ continue
+ self.result[victim] += ['cut']
+ cut = 1
+ for sups in self.combat.get(sup_site, {}):
+ for guy, no_help in self.combat[sup_site][sups]:
+ if guy != supported:
+ continue
+ self.combat[sup_site][sups].remove([guy, no_help])
+ if not self.combat[sup_site][sups]:
+ del self.combat[sup_site][sups]
+ sups -= 1
+ if victim in no_help:
+ no_help.remove(victim)
+ self.combat[sup_site].setdefault(sups, []).append([guy, no_help])
+ break
+ else:
+ continue
+ break
+
+ # -----------------------------------------------------------
+ # STEP 10. MARK DISLODGEMENTS AND UNBOUNCE ALL MOVES THAT LEAD TO DISLODGING UNITS
+ for unit, order in self.command.items():
+ if order[0] != '-' or self.result[unit]:
+ continue
+ site = unit[2:5]
+ offset = 1 if order.split()[-1] == 'VIA' else 0
+ loser = self._occupant(order.split()[-1 - offset], any_coast=1)
+ if loser and (self.command[loser][0] != '-' or self.result[loser]):
+ self.result[loser] = [res for res in self.result[loser] if res != 'disrupted'] + ['dislodged']
+ self.dislodged[loser] = site
+
+ # Check for a dislodged swapper (attacker and dislodged units must not be convoyed.)
+ # If found, remove the swapper from the combat list of the attacker's space
+ head_to_head_battle = not self._is_moving_via_convoy(unit) and not self._is_moving_via_convoy(loser)
+ if self.command[loser][2:5] == site and head_to_head_battle:
+ for sups, items in self.combat.get(site, {}).items():
+ item = [x for x in items if x[0] == loser]
+ if item:
+ self._no_effect(item[0], site)
+ break
+
+ # Marking support for self-dislodgement as void
+ for supporting_unit in self.supports[unit][1]:
+ self.result[supporting_unit] += ['void']
+
+ # Unbounce any powerful-enough move that can now take the spot being vacated by the dislodger.
+ if site in self.combat:
+ self._unbounce(site)
+
+ # Done :-)
+
+ def _move_results(self):
+ """ Resolves moves (Movement phase) and returns a list of messages explaining what happened
+ :return: A list of lines for the results file explaining what happened during the phase
+ """
+ # Resolving moves
+ self._resolve_moves()
+
+ # Determine any retreats
+ for power in self.powers.values():
+ for unit in [u for u in power.units if u in self.dislodged]:
+ if unit not in power.retreats:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True)
+ power.retreats.setdefault(unit, [])
+ attacker_site, site = self.dislodged[unit], unit[2:]
+ attacker = self._occupant(attacker_site)
+ if self.map.loc_abut.get(site):
+ pushee = site
+ else:
+ pushee = site.lower()
+ for abut in self.map.loc_abut[pushee]:
+ abut = abut.upper()
+ where = abut[:3]
+ if ((self._abuts(unit[0], site, '-', abut) or self._abuts(unit[0], site, '-', where))
+ and (not self.combat.get(where)
+ and where != attacker_site or self._is_moving_via_convoy(attacker))):
+
+ # Armies cannot retreat to specific coasts
+ if unit[0] == 'F':
+ power.retreats[unit] += [abut]
+ elif where not in power.retreats[unit]:
+ power.retreats[unit] += [where]
+
+ # List all possible retreats
+ destroyed, self.popped = {}, []
+ if self.dislodged:
+ for power in self.powers.values():
+ for unit in [u for u in power.units if u in self.dislodged]:
+
+ # Removing unit
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ power.units.remove(unit)
+ to_where = power.retreats.get(unit)
+
+ # Describing what it can do
+ if to_where:
+ pass
+ else:
+ destroyed[unit] = power
+ self.popped += [unit]
+
+ # Now (finally) actually move the units that succeeded in moving
+ for power in self.powers.values():
+ for unit in power.units[:]:
+ if self.command[unit][0] == '-' and not self.result[unit]:
+ offset = 1 if self.command[unit].split()[-1] == 'VIA' else 0
+
+ # Removing
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ power.units.remove(unit)
+
+ # Adding
+ new_unit = unit[:2] + self.command[unit].split()[-1 - offset]
+ self.update_hash(power.name, unit_type=new_unit[0], loc=new_unit[2:])
+ power.units += [new_unit]
+
+ # Setting influence
+ for influence_power in self.powers.values():
+ if new_unit[2:5] in influence_power.influence:
+ influence_power.influence.remove(new_unit[2:5])
+ power.influence.append(new_unit[2:5])
+
+ # If units were destroyed, other units may go out of sight
+ if destroyed:
+ for unit, power in destroyed.items():
+ if unit in power.retreats:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True)
+ del power.retreats[unit]
+
+ # All finished
+ self._post_move_update()
+ return []
+
+ def _other_results(self):
+ """ Resolves moves (Retreat and Adjustment phase) and returns a list of messages explaining what happened
+ :return: A list of lines for the results file explaining what happened during the phase
+ """
+ # pylint: disable=too-many-statements,too-many-branches,too-many-nested-blocks
+ self.command = {}
+ self.ordered_units = {}
+ conflicts = {}
+
+ # Adjustments
+ if self.phase_type == 'A':
+ self.result = {}
+
+ # Emptying the results for the Adjustments Phase
+ for power in self.powers.values():
+ self.ordered_units.setdefault(power.name, [])
+ for order in power.adjust[:]:
+
+ # Void order - Marking it as such in results
+ if order.split()[0] == 'VOID':
+ word = order.split()[1:]
+ unit = ' '.join(word[:2])
+ self.result.setdefault(unit, []).append('void')
+ power.adjust.remove(order)
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ # Valid order - Marking as unprocessed
+ else:
+ word = order.split()
+ unit = ' '.join(word[:2])
+ self.result.setdefault(unit, [])
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ # CIVIL DISORDER
+ for power in self.powers.values():
+ diff = len(power.units) - len(power.centers)
+
+ # Detecting missing orders
+ for order in power.adjust[:]:
+ if diff == 0:
+ word = order.split()
+ unit = ' '.join(word[:2])
+ self.result.setdefault(unit, []).append('void')
+ power.adjust.remove(order)
+
+ # Looking for builds
+ elif diff < 0:
+ word = order.split()
+ unit = ' '.join(word[:2])
+ if word[-1] == 'B':
+ diff += 1
+ else:
+ self.result.setdefault(unit, []).append('void')
+ power.adjust.remove(order)
+
+ # Looking for removes
+ else:
+ word = order.split()
+ unit = ' '.join(word[:2])
+ if word[-1] == 'D':
+ diff -= 1
+ else:
+ self.result.setdefault(unit, []).append('void')
+ power.adjust.remove(order)
+
+ if not diff:
+ continue
+
+ power.civil_disorder = 1
+
+ # Need to remove units
+ if diff > 0:
+ fleets = PriorityDict()
+ armies = PriorityDict()
+
+ # Calculating distance to home
+ for unit in power.units:
+ distance = self._get_distance_to_home(unit[0], unit[2:], power.homes)
+ if unit[0] == 'F':
+ fleets[unit] = -1 * distance
+ else:
+ armies[unit] = -1 * distance
+
+ # Removing units
+ for unit in range(diff):
+ goner_distance, goner = 99999, None
+
+ # Removing units with largest distance (using fleets if they are equal)
+ # (using alpha name if multiple units)
+ if fleets:
+ goner_distance, goner = fleets.smallest()
+ if armies and armies.smallest()[0] < goner_distance:
+ goner_distance, goner = armies.smallest()
+ if goner is None:
+ break
+ if goner[0] == 'F':
+ del fleets[goner]
+ else:
+ del armies[goner]
+ power.adjust += ['%s D' % goner]
+ self.result.setdefault(goner, [])
+
+ # Need to build units
+ else:
+ sites = self._build_sites(power)
+ need = min(self._build_limit(power, sites), -diff)
+ power.adjust += ['WAIVE'] * need
+
+ # Retreats phase
+ elif self.phase_type == 'R':
+ self.result = {}
+
+ # Emptying the results for the Retreats Phase
+ for power in self.powers.values():
+ self.ordered_units.setdefault(power.name, [])
+ for retreats in power.retreats:
+ self.result[retreats] = []
+
+ # Emptying void orders - And marking them as such
+ for power in self.powers.values():
+ for order in power.adjust[:]:
+ if order.split()[0] == 'VOID':
+ word = order.split()[1:]
+ unit = ' '.join(word[:2])
+ self.result[unit] = ['void']
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+ power.adjust.remove(order)
+
+ # Disband units with no retreats
+ for power in self.powers.values():
+ if power.retreats and not power.adjust:
+ power.civil_disorder = 1
+ power.adjust = ['%s D' % r_unit for r_unit in power.retreats]
+
+ # Determine multiple retreats to the same location.
+ for power in self.powers.values():
+ for order in power.adjust or []:
+ word = order.split()
+ if len(word) == 4:
+ conflicts.setdefault(word[3][:3], []).append(' '.join(word[:2]))
+
+ # Determine retreat conflict (*bounce, destroyed*)
+ # When finished, "self.popped" will be a list of all retreaters who didn't make it.
+ for retreaters in conflicts.values():
+ if len(retreaters) > 1:
+ for retreater in retreaters:
+ if 'void' in self.result[retreater]:
+ self.result[retreater].remove('void')
+ self.result[retreater] += ['bounce', 'disband']
+ self.popped += retreaters
+
+ # Processing Build and Disband
+ for power in self.powers.values():
+ diff = len(power.units) - len(power.centers)
+
+ # For each order
+ for order in power.adjust or []:
+ word = order.split()
+ unit = ' '.join(word[:2])
+
+ # Build
+ if word[-1] == 'B' and len(word) > 2:
+ if diff < 0:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ power.units += [' '.join(word[:2])]
+ diff += 1
+ self.result[unit] += ['']
+ else:
+ self.result[unit] += ['void']
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ # Disband
+ elif word[-1] == 'D' and self.phase_type == 'A':
+ if diff > 0 and ' '.join(word[:2]) in power.units:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ power.units.remove(' '.join(word[:2]))
+ diff -= 1
+ self.result[unit] += ['']
+ else:
+ self.result[unit] += ['void']
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ # Retreat
+ elif len(word) == 4:
+ if unit not in self.popped:
+ self.update_hash(power.name, unit_type=word[0], loc=word[-1])
+ power.units += [word[0] + ' ' + word[-1]]
+ if unit in self.dislodged:
+ del self.dislodged[unit]
+
+ # Update influence
+ for influence_power in self.powers.values():
+ if word[-1] in influence_power.influence:
+ influence_power.influence.remove(word[-1])
+ power.influence.append(word[-1])
+
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ for dis_unit in power.retreats:
+ self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ power.adjust, power.retreats, power.civil_disorder = [], {}, 0
+
+ # Disbanding
+ for unit in [u for u in self.dislodged]:
+ self.result.setdefault(unit, [])
+ if 'disband' not in self.result[unit]:
+ self.result[unit] += ['disband']
+ del self.dislodged[unit]
+ if unit not in self.popped:
+ self.popped += [unit]
+
+ return []
+
+ def _resolve(self):
+ """ Resolve the current phase
+ :return: A list of strings for the results file explaining how the phase was resolved.
+ """
+ this_phase = self.phase_type
+
+ # This method knows how to process movement, retreat, and adjustment phases.
+ # For others, implement resolve_phase()
+ if this_phase == 'M':
+ self._move_results()
+ elif this_phase in 'RA':
+ self._other_results()
+ self._advance_phase()
+
+ def _clear_history(self):
+ """ Clear all game history fields. """
+ self.state_history.clear()
+ self.order_history.clear()
+ self.result_history.clear()
+ self.message_history.clear()
+ self.clear_orders()
+ self.clear_vote()
diff --git a/diplomacy/engine/map.py b/diplomacy/engine/map.py
new file mode 100644
index 0000000..677a4e7
--- /dev/null
+++ b/diplomacy/engine/map.py
@@ -0,0 +1,1361 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+# -*- coding: utf-8 -*-
+# pylint: disable=too-many-lines
+""" Map
+ - Contains the map object which represents a map where the game can be played
+"""
+from copy import deepcopy
+import os
+from diplomacy import settings
+from diplomacy.utils import KEYWORDS, ALIASES
+import diplomacy.utils.errors as err
+
+# Constants
+UNDETERMINED, POWER, UNIT, LOCATION, COAST, ORDER, MOVE_SEP, OTHER = 0, 1, 2, 3, 4, 5, 6, 7
+MAP_CACHE = {}
+
+
+class Map():
+ """ MAP Class
+
+ Properties:
+ - abbrev: Contains the power abbreviation, otherwise defaults to first letter of PowerName
+ e.g. {'ENGLISH': 'E'}
+ - abuts_cache: Contains a cache of abuts for ['A,'F'] between all locations for orders ['S', 'C', '-']
+ e.g. {(A, PAR, -, MAR): 1, ...}
+ - aliases: Contains a dict of all the aliases (e.g. full province name to 3 char)
+ e.g. {'EAST': 'EAS', 'STP ( /SC )': 'STP/SC', 'FRENCH': 'FRANCE', 'BUDAPEST': 'BUD', 'NOR': 'NWY', ... }
+ - centers: Contains a dict of currently owned supply centers for each player
+ e.g. {'RUSSIA': ['MOS', 'SEV', 'STP', 'WAR'], 'FRANCE': ['BRE', 'MAR', 'PAR'], ... }
+ - convoy_paths: Contains a list of all possible convoys paths bucketed by number of fleets
+ format: {nb of fleets: [(START_LOC, {FLEET LOC}, {DEST LOCS})]}
+ - dummies: Indicates the list of powers that are dummies
+ e.g. ['FRANCE', 'ITALY']
+ - error: Contains a list of errors that the map generated
+ e.g. [''DUPLICATE MAP ALIAS OR POWER: JAPAN']
+ - files: Contains a list of files that were loaded (e.g. USES keyword)
+ e.g. ['standard.map', 'standard.politics', 'standard.geography', 'standard.military']
+ - first_year: Indicates the year where the game is starting.
+ e.g. 1901
+ - flow: List that contains the seasons with the phases
+ e.g. ['SPRING:MOVEMENT,RETREATS', 'FALL:MOVEMENT,RETREATS', 'WINTER:ADJUSTMENTS']
+ - flow_sign: Indicate the direction of flow (1 is positive, -1 is negative)
+ e.g. 1
+ - homes: Contains the list of supply centers where units can be built (i.e. assigned at the beginning)
+ e.g. {'RUSSIA': ['MOS', 'SEV', 'STP', 'WAR'], 'FRANCE': ['BRE', 'MAR', 'PAR'], ... }
+ - inhabits: List that indicates which power have a INHABITS, HOME, or HOMES line
+ e.g. ['FRANCE']
+ - keywords: Contains a dict of keywords to parse status files and orders
+ e.g. {'BUILDS': 'B', '>': '', 'SC': '/SC', 'REMOVING': 'D', 'WAIVED': 'V', 'ATTACK': '', ... }
+ - loc_abut: Contains a adjacency list for each province
+ e.g. {'LVP': ['CLY', 'edi', 'IRI', 'NAO', 'WAL', 'yor'], ...}
+ - loc_coasts: Contains a mapping of all coasts for every location
+ e.g. {'PAR': ['PAR'], 'BUL': ['BUL', 'BUL/EC', 'BUL/SC'], ... }
+ - loc_name: Dict that indicates the 3 letter name of each location
+ e.g. {'GULF OF LYON': 'LYO', 'BREST': 'BRE', 'BUDAPEST': 'BUD', 'RUHR': 'RUH', ... }
+ - loc_type: Dict that indicates if each location is 'WATER', 'COAST', 'LAND', or 'PORT'
+ e.g. {'MAO': 'WATER', 'SER': 'LAND', 'SYR': 'COAST', 'MOS': 'LAND', 'VEN': 'COAST', ... }
+ - locs: List of 3 letter locations (With coasts)
+ e.g. ['ADR', 'AEG', 'ALB', 'ANK', 'APU', 'ARM', 'BAL', 'BAR', 'BEL', 'BER', ... ]
+ - name: Name of the map
+ e.g. 'standard'
+ - own_word: Dict to indicate the word used to refer to people living in each power's country
+ e.g. {'RUSSIA': 'RUSSIAN', 'FRANCE': 'FRENCH', 'UNOWNED': 'UNOWNED', 'TURKEY': 'TURKISH', ... }
+ - owns: List that indicates which power have a OWNS or CENTERS line
+ e.g. ['FRANCE']
+ - phase: String to indicate the beginning phase of the map
+ e.g. 'SPRING 1901 MOVEMENT'
+ - phase_abbrev: Dict to indicate the 1 letter abbreviation for each phase
+ e.g. {'A': 'ADJUSTMENTS', 'M': 'MOVEMENT', 'R': 'RETREATS'}
+ - pow_name: Dict to indicate the power's name
+ e.g. {'RUSSIA': 'RUSSIA', 'FRANCE': 'FRANCE', 'TURKEY': 'TURKEY', 'GERMANY': 'GERMANY', ... }
+ - powers: Contains the list of powers (players) in the game
+ e.g. ['AUSTRIA', 'ENGLAND', 'FRANCE', 'GERMANY', 'ITALY', 'RUSSIA', 'TURKEY']
+ - root_map: Contains the name of the original map file loaded (before the USES keyword are applied)
+ A map that is called with MAP is the root_map
+ e.g. 'standard'
+ - rules: Contains a list of rules used by all variants (for display only)
+ e.g. ['RULE_1']
+ - scs: Contains a list of all the supply centers in the game
+ e.g. ['MOS', 'SEV', 'STP', 'WAR', 'BRE', 'MAR', 'PAR', 'BEL', 'BUL', 'DEN', 'GRE', 'HOL', 'NWY', ... ]
+ - seq: [] Contains the sequence of seasons in format 'SEASON_NAME SEASON_TYPE'
+ e.g. ['NEWYEAR', 'SPRING MOVEMENT', 'SPRING RETREATS', 'FALL MOVEMENT', 'FALL RETREATS',
+ 'WINTER ADJUSTMENTS']
+ - unclear: Contains the alias for ambiguous places
+ e.g. {'EAST': 'EAS'}
+ - unit_names: {} Contains a dict of the unit names
+ e.g. {'F': 'FLEET', 'A': 'ARMY'}
+ - units: Dict that contains the current position of each unit by power
+ e.g. {'FRANCE': ['F BRE', 'A MAR', 'A PAR'], 'RUSSIA': ['A WAR', 'A MOS', 'F SEV', 'F STP/SC'], ... }
+ - validated: Boolean to indicate if the map file has been validated
+ e.g. 1
+ - victory: Indicates the number of supply centers to win the game (>50% required if None)
+ e.g. 18
+ """
+ # pylint: disable=too-many-instance-attributes
+
+ __slots__ = ['name', 'first_year', 'victory', 'phase', 'validated', 'flow_sign', 'root_map', 'abuts_cache',
+ 'homes', 'loc_name', 'loc_type', 'loc_abut', 'loc_coasts', 'own_word', 'abbrev', 'centers', 'units',
+ 'pow_name', 'rules', 'files', 'powers', 'scs', 'owns', 'inhabits', 'flow', 'dummies', 'locs', 'error',
+ 'seq', 'phase_abbrev', 'unclear', 'unit_names', 'keywords', 'aliases', 'convoy_paths']
+
+ def __new__(cls, name='standard', use_cache=True):
+ """ New function - Retrieving object from cache if possible
+ :param name: Name of the map to load
+ :param use_cache: Boolean flag to indicate we want a blank object that doesn't use cache
+ """
+ if name in MAP_CACHE and use_cache:
+ return MAP_CACHE[name]
+ return object.__new__(cls)
+
+ def __init__(self, name='standard', use_cache=True):
+ """ Constructor function
+ :param name: Name of the map to load
+ :param use_cache: Boolean flag to indicate we want a blank object that doesn't use cache
+ """
+ if name in MAP_CACHE:
+ return
+ self.name = name
+ self.first_year = 1901
+ self.victory = self.phase = self.validated = self.flow_sign = None
+ self.root_map = None
+ self.abuts_cache = {}
+ self.homes, self.loc_name, self.loc_type, self.loc_abut, self.loc_coasts = {}, {}, {}, {}, {}
+ self.own_word, self.abbrev, self.centers, self.units, self.pow_name = {}, {}, {}, {}, {}
+ self.rules, self.files, self.powers, self.scs, self.owns, self.inhabits = [], [], [], [], [], []
+ self.flow, self.dummies, self.locs = [], [], []
+ self.error, self.seq = [], []
+ self.phase_abbrev, self.unclear = {}, {}
+ self.unit_names = {'A': 'ARMY', 'F': 'FLEET'}
+ self.keywords, self.aliases = KEYWORDS.copy(), ALIASES.copy()
+ self.load()
+ self.build_cache()
+ self.validate()
+ if name not in CONVOYS_PATH_CACHE and use_cache:
+ CONVOYS_PATH_CACHE[name] = add_to_cache(name)
+ self.convoy_paths = CONVOYS_PATH_CACHE.get(name, {})
+ if use_cache:
+ MAP_CACHE[name] = self
+
+ def __deepcopy__(self, memo):
+ """ Fast deep copy implementation """
+ actual_init = self.__class__.__init__
+ self.__class__.__init__ = lambda *args, **kwargs: None
+ instance = self.__class__(name=self.name, use_cache=False)
+ self.__class__.__init__ = actual_init
+ for key in self.__slots__:
+ setattr(instance, key, deepcopy(getattr(self, key)))
+ return instance
+
+ def __str__(self):
+ return self.name
+
+ def validate(self, force=0):
+ """ Validate that the configuration from a map file is correct
+ :param force: Indicate that we want to force a validation, even if the map is already validated
+ :return: Nothing
+ """
+ # pylint: disable=too-many-branches
+ # Already validated, returning (except if forced or if validating phases)
+ if not force and self.validated:
+ return
+ self.validated = 1
+
+ # Root map
+ self.root_map = self.root_map or self.name
+
+ # Validating powers
+ self.powers = [power_name for power_name in self.homes if power_name != 'UNOWNED']
+ self.powers.sort()
+ if len(self.powers) < 2:
+ self.error += [err.MAP_LEAST_TWO_POWERS]
+
+ # Validating area type
+ for place in self.loc_name.values():
+ if place.upper() not in self.powers and not self.area_type(place):
+ self.error += [err.MAP_LOC_NOT_FOUND % place]
+
+ # Validating adjacencies
+ for place, abuts in self.loc_abut.items():
+ up_abuts = [loc.upper() for loc in abuts]
+ for abut in abuts:
+ up_abut = abut.upper()
+ if up_abuts.count(up_abut) > 1:
+ self.error += [err.MAP_SITE_ABUTS_TWICE % (place.upper(), up_abut)]
+ while up_abut in up_abuts:
+ up_abuts.remove(up_abut)
+
+ # Checking full name
+ if place.upper() not in self.loc_name.values():
+ self.error += [err.MAP_NO_FULL_NAME % place]
+
+ # Checking one-way adjacency
+ for loc in abuts:
+ if self.area_type(place) != 'SHUT' \
+ and self.area_type(loc) != 'SHUT' \
+ and not self.abuts('A', loc, '-', place) \
+ and not self.abuts('F', loc, '-', place):
+ self.error.append(err.MAP_ONE_WAY_ADJ % (place, loc))
+
+ # Validating home centers
+ for power_name, places in self.homes.items():
+ for site in places:
+ # Adding home as supply center
+ if site not in self.scs:
+ self.scs += [site]
+ if not self.area_type(site):
+ self.error += [err.MAP_BAD_HOME % (power_name, site)]
+
+ # Remove home centers from unowned list.
+ # It's perfectly OK for 2 powers to share a home center, as long
+ # as no more than one owns it at the same time.
+ if power_name != 'UNOWNED':
+ if site in self.homes['UNOWNED']:
+ self.homes['UNOWNED'].remove(site)
+
+ # Valid supply centers
+ for scs in self.centers.values():
+ self.scs.extend([center for center in scs if center not in self.scs])
+
+ # Validating initial centers and units
+ for power_name, places in self.centers.items():
+ for loc in places:
+ if not self.area_type(loc):
+ self.error.append(err.MAP_BAD_INITIAL_OWN_CENTER % (power_name, loc))
+
+ # Checking if power has OWN line
+ for power_name in self.powers:
+ if power_name not in self.owns:
+ self.centers[power_name] = self.homes[power_name][:]
+ for unit in self.units.get(power_name, []):
+ if not self.is_valid_unit(unit):
+ self.error.append(err.MAP_BAD_INITIAL_UNITS % (power_name, unit))
+
+ # Checking for multiple owners
+ for power_name, centers in self.centers.items():
+ for site in centers:
+ for other, locs in self.centers.items():
+ if other == power_name and locs.count(site) != 1:
+ self.error += [err.MAP_CENTER_MULT_OWNED % site]
+ elif other != power_name and locs.count(site) != 0:
+ self.error += [err.MAP_CENTER_MULT_OWNED % site]
+ if 'UNOWNED' in self.homes:
+ del self.homes['UNOWNED']
+
+ # Ensure a default game-year FLOW
+ self.flow = ['SPRING:MOVEMENT,RETREATS', 'FALL:MOVEMENT,RETREATS', 'WINTER:ADJUSTMENTS']
+ self.flow_sign = 1
+ self.seq = ['NEWYEAR', 'SPRING MOVEMENT', 'SPRING RETREATS', 'FALL MOVEMENT', 'FALL RETREATS',
+ 'WINTER ADJUSTMENTS']
+ self.phase_abbrev = {'M': 'MOVEMENT', 'R': 'RETREATS', 'A': 'ADJUSTMENTS'}
+
+ # Validating initial game phase
+ self.phase = self.phase or 'SPRING 1901 MOVEMENT'
+ phase = self.phase.split()
+ if len(phase) != 3:
+ self.error += [err.MAP_BAD_PHASE % self.phase]
+ else:
+ self.first_year = int(phase[1])
+
+ def load(self, file_name=None):
+ """ Loads a map file from disk
+ :param file_name: Optional. A string representing the file to open. Otherwise, defaults to the map name
+ :return: Nothing
+ """
+ # pylint: disable=too-many-nested-blocks,too-many-statements,too-many-branches
+ # If file_name is string, opening file from disk
+ # Otherwise file_name is the file handler
+ power = 0
+ if file_name is None:
+ file_name = '{}.map'.format(self.name)
+ file_path = os.path.join(settings.PACKAGE_DIR, 'maps', file_name)
+
+ # Checking if file exists:
+ found_map = 1 if os.path.exists(file_path) else 0
+ if not found_map:
+ self.error.append(err.MAP_FILE_NOT_FOUND % file_name)
+ return
+
+ # Adding to parsed files
+ self.files += [file_name]
+
+ # Parsing file
+ with open(file_path, encoding='utf-8') as file:
+ variant = 0
+
+ for line in file:
+ word = line.split()
+
+ # -- # comment...
+ if not word or word[0][0] == '#':
+ continue
+ upword = word[0].upper()
+
+ # ----------------------------------
+ # Centers needed to obtain a VICTORY
+ # -- VICTORY centerCount...
+ if upword == 'VICTORY':
+ try:
+ self.victory = [int(word) for word in word[1:]]
+ except ValueError:
+ self.error += [err.MAP_BAD_VICTORY_LINE]
+
+ # ---------------------------------
+ # Inclusion of other base map files
+ # -- USE[S] fileName...
+ # -- MAP mapName
+ elif upword in ('USE', 'USES', 'MAP'):
+ if upword == 'MAP':
+ if len(word) != 2:
+ self.error += [err.MAP_BAD_ROOT_MAP_LINE]
+ elif self.root_map:
+ self.error += [err.MAP_TWO_ROOT_MAPS]
+ else:
+ self.root_map = word[1].split('.')[0]
+ for new_file in word[1:]:
+ if '.' not in new_file:
+ new_file = '{}.map'.format(new_file)
+ if new_file not in self.files:
+ self.load(new_file)
+ else:
+ self.error += [err.MAP_FILE_MULT_USED % new_file]
+
+ # ------------------------------------
+ # Set BEGIN phase
+ # -- BEGIN season year phaseType
+ elif upword == 'BEGIN':
+ self.phase = ' '.join(word[1:]).upper()
+
+ # ------------------------------------
+ # RULEs specific to this map
+ elif upword in ('RULE', 'RULES'):
+ if (variant or 'ALL') == 'ALL':
+ self.rules += line.upper().split()[1:]
+
+ # ------------------------------------
+ # -- [oldAbbrev ->] placeName = abbreviation alias...
+ elif '=' in line:
+ token = line.upper().split('=')
+ if len(token) == 1:
+ self.error += [err.MAP_BAD_ALIASES_IN_FILE % token[0]]
+ token += ['']
+ old_name, name, word = 0, token[0].strip(), token[1].split()
+ parts = [part.strip() for part in name.split('->')]
+ if len(parts) == 2:
+ old_name, name = parts
+ elif len(parts) > 2:
+ self.error += [err.MAP_BAD_RENAME_DIRECTIVE % name]
+ if not (word[0][0] + word[0][-1]).isalnum() or word[0] != self.norm(word[0]).replace(' ', ''):
+ self.error += [err.MAP_INVALID_LOC_ABBREV % word[0]]
+
+ # Rename no longer supported
+ # Making sure place not already there
+ if old_name:
+ self.error += [err.MAP_RENAME_NOT_SUPPORTED]
+ if name in self.keywords:
+ self.error += [err.MAP_LOC_RESERVED_KEYWORD % name]
+ normed = name
+ else:
+ normed = self.norm(name)
+ if name in self.loc_name or normed in self.aliases:
+ self.error += [err.MAP_DUP_LOC_OR_POWER % name]
+ self.loc_name[name] = self.aliases[normed] = word[0]
+
+ # Ambiguous place names end with a ?
+ for alias in word[1:]:
+ unclear = alias[-1] == '?'
+ # For ambiguous place names, let's do just a minimal normalization
+ # otherwise they might become unrecognizable (e.g. "THE")
+ normed = alias[:-1].replace('+', ' ').upper() if unclear else self.norm(alias)
+ if unclear:
+ self.unclear[normed] = word[0]
+ elif normed in self.aliases:
+ if self.aliases[normed] != word[0]:
+ self.error += [err.MAP_DUP_ALIAS_OR_POWER % alias]
+ else:
+ self.aliases[normed] = word[0]
+
+ # ------------------------------------
+ # Center ownership (!= Home Ownership)
+ # -- OWNS center...
+ # -- CENTERS [center...]
+ elif upword in ('OWNS', 'CENTERS'):
+ if not power:
+ self.error += [err.MAP_OWNS_BEFORE_POWER % (upword, ' '.join(word))]
+ else:
+ if power not in self.owns:
+ self.owns.append(power)
+ # CENTERS resets the list of centers, OWNS only appends
+ if upword[0] == 'C' or power not in self.centers:
+ self.centers[power] = line.upper().split()[1:]
+ else:
+ self.centers[power].extend(
+ [center for center in line.upper().split()[1:] if center not in self.centers[power]])
+
+ # ------------------------------------
+ # Home centers, overriding those from the power declaration line
+ # -- INHABITS center...
+ elif upword == 'INHABITS':
+ if not power:
+ self.error += [err.MAP_INHABITS_BEFORE_POWER % ' '.join(word)]
+ else:
+ reinit = power not in self.inhabits
+ if reinit:
+ self.inhabits.append(power)
+ self.add_homes(power, word[1:], reinit)
+
+ # -- HOME(S) [center...]
+ elif upword in ('HOME', 'HOMES'):
+ if not power:
+ self.error += [err.MAP_HOME_BEFORE_POWER % (upword, ' '.join(word))]
+ else:
+ if power not in self.inhabits:
+ self.inhabits.append(power)
+ self.add_homes(power, word[1:], 1)
+
+ # ------------------------------------
+ # Clear known units for a power
+ # -- UNITS
+ elif upword == 'UNITS':
+ if power:
+ self.units[power] = []
+ else:
+ self.error += [err.MAP_UNITS_BEFORE_POWER]
+
+ # ------------------------------------
+ # Unit Designation (A or F)
+ # -- unit location
+ elif upword in ('A', 'F'):
+ unit = ' '.join(word).upper()
+ if not power:
+ self.error += [err.MAP_UNIT_BEFORE_POWER]
+ elif len(word) == 2:
+ for units in self.units.values():
+ for current_unit in units:
+ if current_unit[2:5] == unit[2:5]:
+ units.remove(current_unit)
+ self.units.setdefault(power, []).append(unit)
+ else:
+ self.error += [err.MAP_INVALID_UNIT % unit]
+
+ # ------------------------------------
+ # Dummies
+ # -- DUMMY [ALL] -or-
+ # -- DUMMY [ALL EXCEPT] powerName... -or-
+ # -- DUMMIES ALL -or-
+ # -- DUMMIES [ALL EXCEPT] powerName...
+ elif upword in ('DUMMY', 'DUMMIES'):
+ if len(word) > 1:
+ power = None
+ # DUMMY
+ if len(word) == 1:
+ if upword == 'DUMMIES':
+ self.error += [err.MAP_DUMMY_REQ_LIST_POWERS]
+ elif not power:
+ self.error += [err.MAP_DUMMY_BEFORE_POWER]
+ elif power not in self.dummies:
+ self.dummies += [power]
+ # DUMMY powerName powerName
+ elif word[1].upper() != 'ALL':
+ self.dummies.extend(
+ [dummy for dummy in [self.norm_power(p_name) for p_name in word[1:]]
+ if dummy not in self.dummies])
+ # DUMMY ALL
+ elif len(word) == 2:
+ self.dummies = [power_name for power_name in self.homes if power_name != 'UNOWNED']
+ # DUMMY ALL powerName
+ elif word[2].upper() != 'EXCEPT':
+ self.error += [err.MAP_NO_EXCEPT_AFTER_DUMMY_ALL % upword]
+ # DUMMY ALL EXCEPT
+ elif len(word) == 3:
+ self.error += [err.MAP_NO_POWER_AFTER_DUMMY_ALL_EXCEPT % upword]
+ # DUMMY ALL EXCEPT powerName powerName
+ else:
+ self.dummies = [power_name for power_name in self.homes if power_name not in
+ (['UNOWNED'] + [self.norm_power(except_pow) for except_pow in word[3:]])]
+
+ # ------------------------------------
+ # -- DROP abbreviation...
+ elif upword == 'DROP':
+ for place in [loc.upper() for loc in word[1:]]:
+ self.drop(place)
+
+ # ------------------------------------
+ # Terrain type and adjacencies (with special adjacency rules)
+ # -- COAST abbreviation [ABUTS [abut...]] -or-
+ # -- LAND abbreviation [ABUTS [abut...]] -or-
+ # -- WATER abbreviation [ABUTS [abut...]] -or-
+ # -- PORT abbreviation [ABUTS [abut...]] -or-
+ # -- SHUT abbreviation [ABUTS [abut...]] -or-
+ # -- AMEND abbreviation [ABUTS [abut...]]
+ # -- - removes an abut
+ elif len(word) > 1 and upword in ('AMEND', 'WATER', 'LAND', 'COAST', 'PORT', 'SHUT'):
+ place, other = word[1], word[1].swapcase()
+
+ # Removing the place and all its coasts
+ if other in self.locs:
+ self.locs.remove(other)
+ if upword == 'AMEND':
+ self.loc_type[place] = self.loc_type[other]
+ self.loc_abut[place] = self.loc_abut[other]
+ del self.loc_type[other]
+ del self.loc_abut[other]
+ if place.isupper():
+ for loc in self.locs:
+ if loc.startswith(place):
+ self.drop(loc)
+ if place in self.locs:
+ self.locs.remove(place)
+
+ # Re-adding the place and its type
+ self.locs += [place]
+ if upword != 'AMEND':
+ self.loc_type[place] = word[0]
+ if len(word) > 2:
+ self.loc_abut[place] = []
+ elif place not in self.loc_type:
+ self.error += [err.MAP_NO_DATA_TO_AMEND_FOR % place]
+ if len(word) > 2 and word[2].upper() != 'ABUTS':
+ self.error += [err.MAP_NO_ABUTS_FOR % place]
+
+ # Processing ABUTS (adjacencies)
+ for dest in word[3:]:
+
+ # Removing abuts if they start with -
+ if dest[0] == '-':
+ for site in self.loc_abut[place][:]:
+ if site.upper().startswith(dest[1:].upper()):
+ self.loc_abut[place].remove(site)
+ continue
+
+ # Now add the adjacency
+ self.loc_abut[place] += [dest]
+
+ # ------------------------------------
+ # Removal of an existing power
+ # -- UNPLAYED [ALL] -or-
+ # -- UNPLAYED [ALL EXCEPT] powerName...
+ elif upword == 'UNPLAYED':
+ goners = []
+ # UNPLAYED powerName
+ if len(word) == 1:
+ if not power:
+ self.error += [err.MAP_UNPLAYED_BEFORE_POWER]
+ else:
+ goners = [power]
+ # UNPLAYED powerName powerName
+ elif word[1].upper() != 'ALL':
+ goners = [self.norm_power(power_name) for power_name in word[1:]]
+ # UNPLAYED ALL
+ elif len(word) == 2:
+ goners = [power_name for power_name in self.homes if power_name != 'UNOWNED']
+ # UNPLAYED ALL playerName
+ elif word[2].upper() != 'EXCEPT':
+ self.error += [err.MAP_NO_EXCEPT_AFTER_UNPLAYED_ALL]
+ # UNPLAYED ALL EXCEPT
+ elif len(word) == 3:
+ self.error += [err.MAP_NO_POWER_AFTER_UNPLAYED_ALL_EXCEPT]
+ # UNPLAYED ALL EXCEPT powerName
+ else:
+ goners = [power_name for power_name in self.homes if power_name not in
+ (['UNOWNED'] + [self.norm_power(pow_except) for pow_except in word[3:]])]
+
+ # Removing each power
+ for goner in goners:
+ try:
+ del self.pow_name[goner]
+ del self.own_word[goner]
+ del self.homes[goner]
+ self.dummies = [x for x in self.dummies if x != goner]
+ self.inhabits = [x for x in self.inhabits if x != goner]
+ if goner in self.centers:
+ del self.centers[goner]
+ self.owns = [x for x in self.owns if x != goner]
+ if goner in self.abbrev:
+ del self.abbrev[goner]
+ if goner in self.units:
+ del self.units[goner]
+ self.powers = [x for x in self.powers if x != goner]
+ except KeyError:
+ self.error += [err.MAP_NO_SUCH_POWER_TO_REMOVE % goner]
+ power = None
+
+ else:
+ # ------------------------------------
+ # Power name, ownership word, and home centers
+ # -- [oldName ->] powerName [([ownWord][:[abbrev]])] [center...]
+ # -- UNOWNED [center...] -or-
+ # -- NEUTRAL [center...] -or-
+ # -- CENTERS [center...]
+ if upword in ('NEUTRAL', 'CENTERS'):
+ upword = 'UNOWNED'
+ power = self.norm_power(upword) if upword != 'UNOWNED' else 0
+
+ # Renaming power (Not Supported)
+ if len(word) > 2 and word[1] == '->':
+ old_power = power
+ word = word[2:]
+ upword = word[0].upper()
+ if upword in ('NEUTRAL', 'CENTERS'):
+ upword = 'UNOWNED'
+ power = self.norm_power(upword) if upword != 'UNOWNED' else 0
+ if not old_power or not power:
+ self.error += [err.MAP_RENAMING_UNOWNED_DIR_NOT_ALLOWED]
+ elif not self.pow_name.get(old_power):
+ self.error += [err.MAP_RENAMING_UNDEF_POWER % old_power]
+ else:
+ self.error += [err.MAP_RENAMING_POWER_NOT_SUPPORTED]
+
+ # Adding power
+ if power and not self.pow_name.get(power):
+ self.pow_name[power] = upword
+ normed = self.norm(power)
+ # Add power to aliases even if the normed form is identical. That way
+ # it becomes part of the vocabulary.
+ if not normed:
+ self.error += [err.MAP_POWER_NAME_EMPTY_KEYWORD % power]
+ normed = power
+ if normed not in self.aliases:
+ if len(normed.split('/')[0]) in (1, 3):
+ self.error += [err.MAP_POWER_NAME_CAN_BE_CONFUSED % normed]
+ self.aliases[normed] = power
+ elif self.aliases[normed] != power:
+ self.error += [err.MAP_DUP_LOC_OR_POWER % normed]
+
+ # Processing own word and abbreviations
+ upword = power or upword
+ if power and len(word) > 1 and word[1][0] == '(':
+ self.own_word[upword] = word[1][1:-1] or power
+ normed = self.norm(self.own_word[upword])
+ if normed == power:
+ pass
+ elif normed not in self.aliases:
+ self.aliases[normed] = power
+ elif self.aliases[normed] != power:
+ self.error += [err.MAP_DUP_LOC_OR_POWER % normed]
+ if ':' in word[1]:
+ owner, abbrev = self.own_word[upword].split(':')
+ self.own_word[upword] = owner or power
+ self.abbrev[upword] = abbrev[:1].upper()
+ if not abbrev or self.abbrev[upword] in 'M?':
+ self.error += [err.MAP_ILLEGAL_POWER_ABBREV]
+ del word[1]
+ else:
+ self.own_word.setdefault(upword, upword)
+
+ # Adding homes
+ reinit = upword in self.inhabits
+ if reinit:
+ self.inhabits.remove(upword)
+ self.add_homes(upword, word[1:], reinit)
+
+ def build_cache(self):
+ """ Builds a cache to speed up abuts and coasts lookup """
+ # Adding all coasts to loc_coasts
+ for loc in self.locs:
+ self.loc_coasts[loc.upper()] = \
+ [map_loc.upper() for map_loc in self.locs if loc.upper()[:3] == map_loc.upper()[:3]]
+
+ # Building abuts cache
+ for unit_type in ['A', 'F']:
+ for unit_loc in self.locs:
+ for other_loc in self.locs:
+ for order_type in ['-', 'S', 'C']:
+
+ # Calculating and setting in cache
+ unit_loc, other_loc = unit_loc.upper(), other_loc.upper()
+ query_tuple = (unit_type, unit_loc, order_type, other_loc)
+ self.abuts_cache[query_tuple] = self._abuts(*query_tuple)
+
+ def add_homes(self, power, homes, reinit):
+ """ Add new homes (and deletes previous homes if reinit)
+ :param power: Name of power (e.g. ITALY)
+ :param homes: List of homes e.g. ['BUR', '-POR', '*ITA', ... ]
+ :param reinit: Indicates that we want to strip the list of homes before adding
+ :return: Nothing
+ """
+ # Reset homes
+ if reinit:
+ self.homes[power] = []
+ else:
+ self.homes.setdefault(power, [])
+ self.homes.setdefault('UNOWNED', [])
+
+ # For each home:
+ # '-' indicates we want to remove home
+ for home in ' '.join(homes).upper().split():
+ remove = 0
+ while home:
+ if home[0] == '-':
+ remove = 1
+ else:
+ break
+ home = home[1:]
+ if not home:
+ continue
+
+ # Removing the home if already there
+ if home in self.homes[power]:
+ self.homes[power].remove(home)
+ if power != 'UNOWNED':
+ self.homes['UNOWNED'].append(home)
+
+ # Re-adding it
+ if not remove:
+ self.homes[power].append(home)
+
+ def drop(self, place):
+ """ Drop a place
+ :param place: Name of place to remove
+ :return: Nothing
+ """
+ # Removing from locs
+ for loc in list(self.locs):
+ if loc.upper().startswith(place):
+ self.locs.remove(loc)
+
+ # Loc_name
+ for full_name, loc in list(self.loc_name.items()):
+ if loc.startswith(place):
+ self.loc_name.pop(full_name)
+
+ # Aliases
+ for alias, loc in list(self.aliases.items()):
+ if loc.startswith(place):
+ self.aliases.pop(alias)
+
+ # Homes
+ for power_name, power_homes in list(self.homes.items()):
+ if place in power_homes:
+ self.homes[power_name].remove(place)
+
+ # Units
+ for power_name, power_units in list(self.units.items()):
+ for unit in power_units:
+ if unit[2:5] == place[:3]:
+ self.units[power_name].remove(unit)
+
+ # Supply Centers
+ for center in list(self.scs):
+ if center.upper().startswith(place):
+ self.scs.remove(center)
+
+ # Centers ownerships
+ for power_name, power_centers in list(self.centers.items()):
+ for center in power_centers:
+ if center.startswith(place):
+ self.centers[power_name].remove(center)
+
+ # Removing from adjacencies list
+ for site_name, site_abuts in list(self.loc_abut.items()):
+ for site in [loc for loc in site_abuts if loc.upper().startswith(place)]:
+ self.loc_abut[site_name].remove(site)
+ if site_name.startswith(place):
+ self.loc_abut.pop(site_name)
+
+ # Removing loc_type
+ for loc in list(self.loc_type):
+ if loc.startswith(place):
+ self.loc_type.pop(loc)
+
+ def norm_power(self, power):
+ """ Normalise the name of a power (removes spaces)
+ :param power: Name of power to normalise
+ :return: Normalised power name
+ """
+ return self.norm(power).replace(' ', '')
+
+ def norm(self, phrase):
+ """ Normalise a sentence (add spaces before /, replace -+, with ' ', remove .:
+ :param phrase: Phrase to normalise
+ :return: Normalised sentences
+ """
+ phrase = phrase.upper().replace('/', ' /').replace(' / ', '')
+ for token in '.:':
+ phrase = phrase.replace(token, '')
+ for token in '-+,':
+ phrase = phrase.replace(token, ' ')
+ for token in '|*?!~()[]=_^':
+ phrase = phrase.replace(token, ' {} '.format(token))
+
+ # Replace keywords which, contrary to aliases, all consist of a single word
+ return ' '.join([self.keywords.get(keyword, keyword) for keyword in phrase.strip().split()])
+
+ def compact(self, phrase):
+ """ Compacts a full sentence into a list of short words
+ :param phrase: The full sentence to compact (e.g. 'England: Fleet Western Mediterranean -> Tyrrhenian
+ Sea. (*bounce*)')
+ :return: The compacted phrase in an array (e.g. ['ENGLAND', 'F', 'WES', 'TYS', '|'])
+ """
+ word, result = self.norm(phrase).split(), []
+ while word:
+ alias, i = self.alias(word)
+ if alias:
+ result += alias.split()
+ word = word[i:]
+ return result
+
+ def alias(self, word):
+ """ This function is used to replace multi-words with their acronyms
+ :param word: The current list of words to try to shorten
+ :return: alias, ix - alias is the shorten list of word, ix is the ix of the next non-processed word
+ """
+ # pylint: disable=too-many-return-statements
+ # Assume that word already was subject to norm()
+ # Process with content inside square or round brackets
+ j = -1
+ alias = word[0]
+ if alias in '([':
+ for j in range(1, len(word)):
+ if word[j] == '])'[alias == '(']:
+ break
+ else:
+ return alias, 1
+ if j == 1:
+ return '', 2
+ if word[1] + word[j - 1] == '**':
+ word2 = word[2:j - 1]
+ else:
+ word2 = word[1:j]
+ alias2 = self.aliases.get(' '.join(word2) + ' \\', '')
+ if alias2[-2:] == ' \\':
+ return alias2[:-2], j + 1
+ result = []
+ while word2:
+ alias2, i = self.alias(word2)
+ if alias2:
+ result += [alias2]
+ word2 = word2[i:]
+ return ' '.join(result), j + 1
+ for i in range(len(word), 0, -1):
+ key = ' '.join(word[:i])
+ if key in self.aliases:
+ alias = self.aliases[key]
+ break
+ else:
+ i = 1
+
+ # Concatenate coasts
+ if i == len(word):
+ return alias, i
+ if alias[:1] != '/' and ' ' not in alias:
+ alias2, j = self.alias(word[i:])
+ if alias2[:1] != '/' or ' ' in alias2:
+ return alias, i
+ elif alias[-2:] == ' \\':
+ alias2, j = self.alias(word[i:])
+ if alias2[:1] == '/' or ' ' in alias2:
+ return alias, i
+ alias, alias2 = alias2, alias[:-2]
+ else:
+ return alias, i
+
+ # Check if the location is also an ambiguous power name
+ # and replace with its other name if that's the case
+ if alias in self.powers and alias in self.unclear:
+ alias = self.unclear[alias]
+
+ # Check if the coast is mapped to another coast
+ if alias + ' ' + alias2 in self.aliases:
+ return self.aliases[alias + ' ' + alias2], i + j
+ return alias + alias2, i + j
+
+ def vet(self, word, strict=0):
+ """ Determines the type of every word in a compacted order phrase
+ 0 - Undetermined, 1 - Power, 2 - Unit, 3 - Location, 4 - Coastal location
+ 5 - Order, 6 - Move Operator (-=_^), 7 - Non-move separator (|?~) or result (*!?~+)
+ :param word: The list of words to vet (e.g. ['A', 'POR', 'S', 'SPA/NC'])
+ :param strict: Boolean to indicate that we want to verify that the words actually exist.
+ Numbers become negative if they don't exist
+ :return: A list of tuple (e.g. [('A', 2), ('POR', 3), ('S', 5), ('SPA/NC', 4)])
+ """
+ result = []
+ for thing in word:
+ if ' ' in thing:
+ data_type = UNDETERMINED
+ elif len(thing) == 1:
+ if thing in self.unit_names:
+ data_type = UNIT
+ elif thing.isalnum():
+ data_type = ORDER
+ elif thing in '-=_':
+ data_type = MOVE_SEP
+ else:
+ data_type = OTHER
+ elif '/' in thing:
+ if thing.find('/') == 3:
+ data_type = COAST
+ else:
+ data_type = POWER
+ elif thing == 'VIA':
+ data_type = ORDER
+ elif len(thing) == 3:
+ data_type = LOCATION
+ else:
+ data_type = POWER
+ if strict and thing not in list(self.aliases.values()) + list(self.keywords.values()):
+ data_type = -data_type
+ result += [(thing, data_type)]
+ return result
+
+ def rearrange(self, word):
+ """ This function is used to parse commands
+ :param word: The list of words to vet (e.g. ['ENGLAND', 'F', 'WES', 'TYS', '|'])
+ :return: The list of words in the correct order to be processed (e.g. ['ENGLAND', 'F', 'WES', '-', 'TYS'])
+ """
+ # pylint: disable=too-many-branches
+ # Add | to start and end of list (to simplify edge cases) (they will be returned as ('|', 7))
+ # e.g. [('|', 7), ('A', 2), ('POR', 3), ('S', 5), ('SPA/NC', 4), ('|', 7)]
+ result = self.vet(['|'] + word + ['|'])
+
+ # Remove result tokens (7) at start and end of string (but keep |)
+ result[0] = ('|', UNDETERMINED)
+ while result[-2][1] == OTHER:
+ del result[-2]
+ if len(result) == 2:
+ return []
+ result[0] = ('|', OTHER)
+ while result[1][1] == OTHER:
+ del result[1]
+
+ # Move "with" unit and location to the start. There should be only one
+ # Ignore the rest
+ found = 0
+ while ('?', OTHER) in result:
+ i = result.index(('?', OTHER))
+ del result[i]
+ if found:
+ continue
+ j = -1
+ for j in range(i, len(result)):
+ if result[j][1] in (POWER, UNIT):
+ continue
+ if result[j][1] in (LOCATION, COAST):
+ j += 1
+ break
+ if j != i:
+ found = 1
+ k = 0
+ for k in range(1, i):
+ if result[k][1] not in (POWER, UNIT):
+ break
+ if k < i:
+ result[k:k] = result[i:j]
+ result[j:2 * j - i] = []
+
+ # Move "from" location before any preceding locations
+ while ('\\', OTHER) in result:
+ i = result.index(('\\', OTHER))
+ del result[i]
+ if result[i][1] not in (LOCATION, COAST):
+ continue
+ for j in range(i - 1, -1, -1):
+ if result[j][1] not in (LOCATION, COAST) and result[j][0] != '~':
+ break
+ if j + 1 != i:
+ result[j + 1:j + 1] = result[i:i + 1]
+ del result[i + 1]
+
+ # Move "via" locations between the two preceding locations.
+ while ('~', OTHER) in result:
+ i = result.index(('~', OTHER))
+ del result[i]
+ if (result[i][1] not in (LOCATION, COAST)
+ or result[i - 1][1] not in (LOCATION, COAST)
+ or result[i - 2][1] not in (LOCATION, COAST)):
+ continue
+ for j in range(i + 1, len(result)):
+ if result[j][1] not in (LOCATION, COAST):
+ break
+ result[j:j] = result[i - 1:i]
+ del result[i - 1]
+
+ # Move order beyond first location
+ i = 0
+ for j in range(1, len(result)):
+ if result[j][1] in (LOCATION, COAST):
+ if i:
+ result[j + 1:j + 1] = result[i:i + 1]
+ del result[i]
+ break
+ elif result[j][1] == ORDER:
+ i = j
+ elif result[j][0] == '|':
+ break
+
+ # Put the power before the unit, or replace it with a location if there's ambiguity
+ vet = 0
+ for i, result_i in enumerate(result):
+ if result_i[1] == POWER:
+ if vet > 0 and result_i[0] in self.unclear:
+ result[i] = (self.unclear[result_i[0]], LOCATION)
+ elif vet == 1:
+ result[i + 1:i + 1] = result[i - 1:i]
+ del result[i - 1]
+ vet = 2
+ elif not vet and result_i[1] == UNIT:
+ vet = 1
+ elif result_i[1] == ORDER:
+ vet = 0
+ else:
+ vet = 2
+
+ # Insert hyphens between subsequent locations
+ for i in range(len(result) - 1, 1, -1):
+ if result[i][1] in (LOCATION, COAST) and result[i - 1][1] in (LOCATION, COAST):
+ result[i:i] = [('-', MOVE_SEP)]
+
+ # Remove vertical bars at start and end
+ return [x for x, y in result[1:-1]]
+
+ def area_type(self, loc):
+ """ Returns 'WATER', 'COAST', 'PORT', 'LAND', 'SHUT'
+ :param loc: The name of the location to query
+ :return: Type of the location ('WATER', 'COAST', 'PORT', 'LAND', 'SHUT')
+ """
+ return self.loc_type.get(loc.upper()) or self.loc_type.get(loc.lower())
+
+ def default_coast(self, word):
+ """ Returns the coast for a fleet move order that can only be to a single coast
+ (e.g. F GRE-BUL returns F GRE-BUL/SC)
+ :param word: A list of tokens (e.g. ['F', 'GRE', '-', 'BUL'])
+ :return: The updated list of tokens (e.g. ['F', 'GRE', '-', 'BUL/SC'])
+ """
+ if len(word) == 4 and word[0] == 'F' and word[2] == '-' and '/' not in word[3]:
+ unit_loc, new_loc, single_coast = word[1], word[3], None
+ for place in self.abut_list(unit_loc):
+ up_place = place.upper()
+ if new_loc == up_place: # Target location found with no coast, original query is correct
+ return word
+ if new_loc == up_place[:3]:
+ if single_coast: # Target location has multiple coasts, unable to decide
+ return word
+ single_coast = up_place # Found a potential candidate, storing it
+ word[3] = single_coast or new_loc # Only one candidate found, modifying the order to include it
+ return word
+
+ def find_coasts(self, loc):
+ """ Finds all coasts for a given location
+ :param loc: The name of a location (e.g. 'BUL')
+ :return: Returns the list of all coasts, including the location (e.g. ['BUL', 'BUL/EC', 'BUL/SC']
+ """
+ return self.loc_coasts.get(loc.upper(), [])
+
+ def abuts(self, unit_type, unit_loc, order_type, other_loc):
+ """ Determines if a order for unit_type from unit_loc to other_loc is adjacent
+ Note: This method uses the precomputed cache
+
+ :param unit_type: The type of unit ('A' or 'F')
+ :param unit_loc: The location of the unit ('BUR', 'BUL/EC')
+ :param order_type: The type of order ('S' for Support, 'C' for Convoy', '-' for move)
+ :param other_loc: The location of the other unit
+ :return: 1 if the locations are adjacent for the move, 0 otherwise
+ """
+ if unit_type == '?':
+ return (self.abuts_cache.get(('A', unit_loc.upper(), order_type, other_loc.upper()), 0) or
+ self.abuts_cache.get(('F', unit_loc.upper(), order_type, other_loc.upper()), 0))
+
+ query_tuple = (unit_type, unit_loc.upper(), order_type, other_loc.upper())
+ return self.abuts_cache.get(query_tuple, 0)
+
+ def _abuts(self, unit_type, unit_loc, order_type, other_loc):
+ """ Determines if a order for unit_type from unit_loc to other_loc is adjacent
+ Note: This method is used to generate the abuts_cache
+
+ :param unit_type: The type of unit ('A' or 'F')
+ :param unit_loc: The location of the unit ('BUR', 'BUL/EC')
+ :param order_type: The type of order ('S' for Support, 'C' for Convoy', '-' for move)
+ :param other_loc: The location of the other unit
+ :return: 1 if the locations are adjacent for the move, 0 otherwise
+ """
+ # pylint: disable=too-many-return-statements
+ unit_loc, other_loc = unit_loc.upper(), other_loc.upper()
+
+ # Removing coasts for support
+ # Otherwise, if army, not adjacent since army can't move, hold, or convoy on coasts
+ if '/' in other_loc:
+ if order_type == 'S':
+ other_loc = other_loc[:3]
+ elif unit_type == 'A':
+ return 0
+
+ # Looking for adjacency between unit_loc and other_loc
+ # If the break line is not executed, not adjacency were found
+ place = ''
+ for place in self.abut_list(unit_loc):
+ up_place = place.upper()
+ up_loc = up_place[:3]
+ if other_loc in (up_place, up_loc):
+ break
+ else:
+ return 0
+
+ # If the target location is impassible, returning 0
+ other_loc_type = self.area_type(other_loc)
+ if other_loc_type == 'SHUT':
+ return 0
+
+ # If the unit type is unknown, then assume the adjacency is okay
+ if unit_type == '?':
+ return 1
+
+ # Fleets cannot affect LAND and fleets are not adjacent to any location listed in lowercase
+ # (except when offering support into such an area, as in F BOT S A MOS-STP), or listed in
+ # the adjacency list in lower-case (F VEN-TUS)
+
+ # Fleet should be supporting a adjacent 'COAST', 'WATER' or 'PORT', with a name starting with a capital letter
+ if unit_type == 'F':
+ if (other_loc_type == 'LAND'
+ or place[0] != up_loc[0]
+ or order_type != 'S'
+ and other_loc not in self.loc_type):
+ return 0
+
+ # Armies cannot move to water (unless this is a convoy). Note that the caller
+ # is responsible for determining if a fleet exists at the adjacent spot to convoy
+ # the army. Also, armies can't move to spaces listed in Mixed case.
+ elif order_type != 'C' and (other_loc_type == 'WATER' or place == place.title()):
+ return 0
+
+ # It's adjacent.
+ return 1
+
+ def is_valid_unit(self, unit, no_coast_ok=0, shut_ok=0):
+ """ Determines if a unit and location combination is valid (e.g. 'A BUR') is valid
+ :param unit: The name of the unit with its location (e.g. F SPA/SC)
+ :param no_coast_ok: Indicates if a coastal location with no coast (e.g. SPA vs SPA/SC) is acceptable
+ :param shut_ok: Indicates if a impassable country (e.g. Switzerland) is OK
+ :return: A boolean to indicate if the unit/location combination is valid
+ """
+ unit_type, loc = unit.upper().split()
+ area_type = self.area_type(loc)
+ if area_type == 'SHUT':
+ return 1 if shut_ok else 0
+ if unit_type == '?':
+ return 1 if area_type is not None else 0
+ # Army can be anywhere, except in 'WATER'
+ if unit_type == 'A':
+ return '/' not in loc and area_type in ('LAND', 'COAST', 'PORT')
+ # Fleet must be in WATER, COAST, or PORT
+ # Coastal locations are stored in CAPS with coasts and non-caps with non-coasts
+ # e.g. SPA/NC, SPA/SC, spa
+ return (unit_type == 'F'
+ and area_type in ('WATER', 'COAST', 'PORT')
+ and (no_coast_ok or loc.lower() not in self.loc_abut))
+
+ def abut_list(self, site, incl_no_coast=False):
+ """ Returns the adjacency list for the site
+ :param site: The province we want the adjacency list for
+ :param incl_no_coast: Boolean flag that indicates to also include province without coast if it has coasts
+ e.g. will return ['BUL/SC', 'BUL/EC'] if False, and ['bul', 'BUL/SC', 'BUL/EC'] if True
+ :return: A list of adjacent provinces
+
+ Note: abuts are returned in mixed cases (lowercase for A only, First capital letter for F only)
+ """
+ if site in self.loc_abut:
+ abut_list = self.loc_abut.get(site, [])
+ else:
+ abut_list = self.loc_abut.get(site.lower(), [])
+ if incl_no_coast:
+ abut_list = abut_list[:]
+ for loc in list(abut_list):
+ if '/' in loc and loc[:3] not in abut_list:
+ abut_list += [loc[:3]]
+ return abut_list
+
+ def find_next_phase(self, phase, phase_type=None, skip=0):
+ """ Returns the long name of the phase coming immediately after the phase
+ :param phase: The long name of the current phase (e.g. SPRING 1905 RETREATS)
+ :param phase_type: The type of phase we are looking for (e.g. 'M' for Movement, 'R' for Retreats,
+ 'A' for Adjust.)
+ :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after)
+ :return: The long name of the next phase (e.g. FALL 1905 MOVEMENT)
+ """
+ # If len < 3, Phase is FORMING or COMPLETED, unable to find previous phase
+ now = phase.split()
+ if len(now) < 3:
+ return phase
+
+ # Parsing year and season index
+ year = int(now[1])
+ season_ix = (self.seq.index('%s %s' % (now[0], now[2])) + 1) % len(self.seq)
+ seq_len = len(self.seq)
+
+ # Parsing the sequence of seasons
+ while seq_len:
+ seq_len -= 1
+ new = self.seq[season_ix].split()
+
+ # Looking for IFYEARDIV DIV or IFYEARDIV DIV=MOD
+ if new[0] == 'IFYEARDIV':
+ if '=' in new[1]:
+ div, mod = map(int, new[1].split('='))
+ else:
+ div, mod = int(new[1]), 0
+ if year % div != mod:
+ season_ix = -1
+
+ # NEWYEAR [X] indicates to increase years by [X] (or 1 by default)
+ elif new[0] == 'NEWYEAR':
+ year += len(new) == 1 or int(new[1])
+
+ # Found phase
+ elif phase_type in (None, new[1][0]):
+ if skip == 0:
+ return '%s %s %s' % (new[0], year, new[1])
+ skip -= 1
+ seq_len = len(self.seq)
+ season_ix += 1
+ season_ix %= len(self.seq)
+
+ # Could not find next phase
+ return ''
+
+ def find_previous_phase(self, phase, phase_type=None, skip=0):
+ """ Returns the long name of the phase coming immediately prior the phase
+ :param phase: The long name of the current phase (e.g. SPRING 1905 RETREATS)
+ :param phase_type: The type of phase we are looking for (e.g. 'M' for Movement, 'R' for Retreats,
+ 'A' for Adjust.)
+ :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after)
+ :return: The long name of the previous phase (e.g. SPRING 1905 MOVEMENT)
+ """
+ # If len < 3, Phase is FORMING or COMPLETED, unable to find previous phase
+ now = phase.split()
+ if len(now) < 3:
+ return phase
+
+ # Parsing year and season index
+ year = int(now[1])
+ season_ix = self.seq.index('%s %s' % (now[0], now[2]))
+ seq_len = len(self.seq)
+
+ # Parsing the sequence of seasons
+ while seq_len:
+ seq_len -= 1
+ season_ix -= 1
+
+ # No more seasons in seq
+ if season_ix == -1:
+ for new in [x.split() for x in self.seq]:
+ # Looking for IFYEARDIV DIV or IFYEARDIV DIV=MOD
+ if new[0] == 'IFYEARDIV':
+ if '=' in new[1]:
+ div, mod = map(int, new[1].split('='))
+ else:
+ div, mod = int(new[1]), 0
+ if year % div != mod:
+ break
+ season_ix += 1
+
+ # Parsing next seq
+ new = self.seq[season_ix].split()
+ if new[0] == 'IFYEARDIV':
+ pass
+
+ # NEWYEAR [X] indicates to increase years by [X] (or 1 by default)
+ elif new[0] == 'NEWYEAR':
+ year -= len(new) == 1 or int(new[1])
+
+ # Found phase
+ elif phase_type in (None, new[1][0]):
+ if skip == 0:
+ return '%s %s %s' % (new[0], year, new[1])
+ skip -= 1
+ seq_len = len(self.seq)
+
+ # Could not find prev phase
+ return ''
+
+ def compare_phases(self, phase1, phase2):
+ """ Compare 2 phases (Strings) and return 1, -1, or 0 to indicate which phase is larger
+ :param phase1: The first phase (e.g. S1901M, FORMING, COMPLETED)
+ :param phase2: The second phase (e.g. S1901M, FORMING, COMPLETED)
+ :return: 1 if phase1 > phase2, -1 if phase2 > phase1 otherwise 0 if they are equal
+ """
+ # If the phase ends with '?', we assume it's the last phase type of that season
+ # e.g. S1901? -> S1901R W1901? -> W1901A
+ if phase1[-1] == '?':
+ phase1 = phase1[:-1] + [season.split()[1][0] for season in self.seq if season[0] == phase1[0]][-1]
+ if phase2[-1] == '?':
+ phase2 = phase2[:-1] + [season.split()[1][0] for season in self.seq if season[0] == phase2[0]][-1]
+
+ # Converting S1901M (abbrv) to long phase (SPRING 1901 MOVEMENT)
+ if len(phase1.split()) == 1:
+ phase1 = self.phase_long(phase1, phase1.upper())
+ if len(phase2.split()) == 1:
+ phase2 = self.phase_long(phase2, phase2.upper())
+ if phase1 == phase2:
+ return 0
+ now1, now2 = phase1.split(), phase2.split()
+
+ # One of the phase is either FORMING, or COMPLETED
+ # Syntax is (bool1 and int1 or bool2 and int2) will return int1 if bool1, else int2 if bool2
+ # 1 = FORMING, 2 = Normal Phase, 3 = COMPLETED, 0 = UNKNOWN
+ if len(now1) < 3 or len(now2) < 3:
+ order1 = (len(now1) > 2 and 2 or phase1 == 'FORMING' and 1 or phase1 == 'COMPLETED' and 3 or 0)
+ order2 = (len(now2) > 2 and 2 or phase2 == 'FORMING' and 1 or phase2 == 'COMPLETED' and 3 or 0)
+ return order1 > order2 and 1 or order1 < order2 and -1 or 0
+
+ # Comparing years
+ year1, year2 = int(now1[1]), int(now2[1])
+ if year1 != year2:
+ return (year1 > year2 and 1 or -1) * (self.flow_sign or 1)
+
+ # Comparing seasons
+ # Returning the inverse if NEW_YEAR is between the 2 seasons
+ season_ix1 = self.seq.index('%s %s' % (now1[0], now1[2]))
+ season_ix2 = self.seq.index('%s %s' % (now2[0], now2[2]))
+ if season_ix1 > season_ix2:
+ return -1 if 'NEWYEAR' in [x.split()[0] for x in self.seq[(season_ix2) + (1):season_ix1]] else 1
+ if season_ix1 < season_ix2:
+ return 1 if 'NEWYEAR' in [x.split()[0] for x in self.seq[(season_ix1) + (1):season_ix2]] else -1
+ return 0
+
+ @staticmethod
+ def phase_abbr(phase, default='?????'):
+ """ Constructs a 5 character representation (S1901M) from a phase (SPRING 1901 MOVEMENT)
+ :param phase: The full phase (e.g. SPRING 1901 MOVEMENT)
+ :param default: The default value to return in case conversion fails
+ :return: A 5 character representation of the phase
+ """
+ if phase in ('FORMING', 'COMPLETED'):
+ return phase
+ parts = tuple(phase.split())
+ return ('%.1s%04d%.1s' % (parts[0], int(parts[1]), parts[2])).upper() if len(parts) == 3 else default
+
+ def phase_long(self, phase_abbr, default='?????'):
+ """ Constructs a full sentence of a phase from a 5 character abbreviation
+ :param phase_abbr: 5 character abbrev. (e.g. S1901M)
+ :param default: The default value to return in case conversion fails
+ :return: A full phase description (e.g. SPRING 1901 MOVEMENT)
+ """
+ try:
+ year = int(phase_abbr[1:-1])
+ for season in self.seq:
+ parts = season.split()
+ if parts[0] not in ('NEWYEAR', 'IFYEARDIV') \
+ and parts[0][0].upper() == phase_abbr[0].upper() \
+ and parts[1][0].upper() == phase_abbr[-1].upper():
+ return '{} {} {}'.format(parts[0], year, parts[1]).upper()
+ except ValueError:
+ pass
+ return default
+
+# Loading at the bottom, to avoid load recursion
+from diplomacy.utils.convoy_paths import add_to_cache, get_convoy_paths_cache # pylint: disable=wrong-import-position
+CONVOYS_PATH_CACHE = get_convoy_paths_cache()
diff --git a/diplomacy/engine/message.py b/diplomacy/engine/message.py
new file mode 100644
index 0000000..2d6d644
--- /dev/null
+++ b/diplomacy/engine/message.py
@@ -0,0 +1,115 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+""" Game message. Represent a message exchanged inside a game.
+
+ Possible messages exchanges:
+ - power 1 -> power 2
+ - power -> all game
+ - system -> power
+ - system -> all game
+ - system -> observers
+ - system -> omniscient observers
+
+ Sender `system` is identified with constant SYSTEM defined below.
+ Recipients `all game`, `observers` and `omniscient observers` are identified respectively with constants
+ GLOBAL, OBSERVER and OMNISCIENT defined below.
+
+ Consider using Game methods to generate appropriate messages instead of this class directly:
+ - Game.new_power_message() to send a message from a power to another.
+ - Game.new_global_message() to send a message from a power to all game.
+ - ServerGame.new_system_message() to send a server system message.
+ Use constant names defined below to specify recipient for system message when it's not a power name
+ (GLOBAL, OBSERVER or OMNISCIENT).
+"""
+from diplomacy.utils import parsing, strings
+from diplomacy.utils.jsonable import Jsonable
+
+SYSTEM = 'SYSTEM' # sender
+GLOBAL = 'GLOBAL' # recipient (all powers)
+OBSERVER = 'OBSERVER' # recipient (all observer tokens)
+OMNISCIENT = 'OMNISCIENT' # recipient (all omniscient tokens)
+
+class Message(Jsonable):
+ """ GameMessage class. Properties:
+ - sender: message sender name: either SYSTEM or a power name.
+ - recipient: message recipient name: either GLOBAL, OBSERVER, OMNISCIENT or a power name.
+ - time_sent: message timestamp in microseconds.
+ - phase: short name of game phase when message is sent.
+ - message: message body.
+
+ Note about timestamp management:
+ We assume a message has an unique timestamp inside one game. To respect this rule, the server is the only one
+ responsible for generating message timestamps. This allow to generate timestamp or only 1 same machine (server)
+ instead of managing timestamps from many user machines, to prevent timestamp inconsistency when messages
+ are stored on server. Therefore, message timestamp is the time when server stores the message, not the time
+ when message was sent by any client.
+ """
+ __slots__ = ['sender', 'recipient', 'time_sent', 'phase', 'message']
+ model = {
+ strings.SENDER: str, # either SYSTEM or a power name.
+ strings.RECIPIENT: str, # either GLOBAL, OBSERVER, OMNISCIENT or a power name.
+ strings.TIME_SENT: parsing.OptionalValueType(int), # given by server.
+ strings.PHASE: str, # phase short name.
+ strings.MESSAGE: str,
+ }
+
+ def __init__(self, **kwargs):
+ self.sender = None # type: str
+ self.recipient = None # type: str
+ self.time_sent = None # type: int
+ self.phase = None # type: str
+ self.message = None # type: str
+ super(Message, self).__init__(**kwargs)
+
+ def __str__(self):
+ return '[%d/%s/%s->%s](%s)' % (
+ self.time_sent, self.phase, self.sender, self.recipient, self.message)
+
+ def __hash__(self):
+ return hash(self.time_sent)
+
+ def __eq__(self, other):
+ assert isinstance(other, Message)
+ return self.time_sent == other.time_sent
+
+ def __ne__(self, other):
+ assert isinstance(other, Message)
+ return self.time_sent != other.time_sent
+
+ def __lt__(self, other):
+ assert isinstance(other, Message)
+ return self.time_sent < other.time_sent
+
+ def __gt__(self, other):
+ assert isinstance(other, Message)
+ return self.time_sent > other.time_sent
+
+ def __le__(self, other):
+ assert isinstance(other, Message)
+ return self.time_sent <= other.time_sent
+
+ def __ge__(self, other):
+ assert isinstance(other, Message)
+ return self.time_sent >= other.time_sent
+
+ def is_global(self):
+ """ Return True if this message is global. """
+ return self.recipient == GLOBAL
+
+ def for_observer(self):
+ """ Return True if this message is sent to observers. """
+ return self.recipient == OBSERVER
diff --git a/diplomacy/engine/power.py b/diplomacy/engine/power.py
new file mode 100644
index 0000000..570359b
--- /dev/null
+++ b/diplomacy/engine/power.py
@@ -0,0 +1,392 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+""" Power
+ - Contains the power object representing a power in the game
+"""
+from copy import deepcopy
+from diplomacy.utils import parsing, strings
+from diplomacy.utils.exceptions import DiplomacyException
+from diplomacy.utils.jsonable import Jsonable
+from diplomacy.utils.sorted_dict import SortedDict
+from diplomacy.utils import common, constants
+from diplomacy.utils.constants import OrderSettings
+
+class Power(Jsonable):
+ """ Power Class
+
+ Properties:
+ - abbrev - Contains the abbrev of the power (usually the first letter of the power name) (e.g. 'F' for FRANCE)
+ - adjust - List of pending adjustment orders
+ (e.g. ['A PAR B', 'A PAR R MAR', 'A MAR D', 'WAIVE'])
+ - centers - Contains the list of supply centers currently controlled by the power ['MOS', 'SEV', 'STP', 'WAR']
+ - civil_disorder - Boolean flag to indicate that the power has been put in CIVIL_DISORDER (e.g. True or False)
+ - controller - Sorted dictionary mapping timestamp to controller (either dummy or a user ID) who takes
+ control of power at this timestamp.
+ - game - Contains a reference to the game object
+ - goner - Boolean to indicate that this power doesn't control any SCs any more (e.g. True or False)
+ - homes - Contains a list of homes supply centers (where you can build)
+ e.g. ['PAR', 'MAR', ... ] or None if empty
+ - influence - Contains a list of locations influenced by this power
+ Note: To influence a location, the power must have visited it last.
+ e.g ['PAR', 'MAR', ... ]
+ - name - Contains the name of the power
+ - orders - Contains a dictionary of units and their orders.
+ For NO_CHECK games, unit is 'ORDER 1', 'ORDER 2', ...
+ - e.g. {'A PAR': '- MAR' } or {'ORDER 1': 'A PAR - MAR', 'ORDER 2': '...', ... }
+ - Can also be {'REORDER 1': 'A PAR - MAR', 'INVALID 1': 'A PAR - MAR', ... } after validation
+ - retreats - Contains the list of units that need to retreat with their possible retreat locations
+ (e.g. {'A PAR': ['MAR', 'BER']})
+ - role - Power type (observer, omniscient, player or server power).
+ Either the power name (for a player power) or a value in diplomacy.utils.strings.ALL_ROLE_TYPES
+ - tokens - Only for server power: set of tokens of current power controlled (if not None).
+ - units - Contains the list of units (e.g. ['A PAR', 'A MAR', ...]
+ - vote - Only for omniscient, player and server power: power vote ('yes', 'no' or 'neutral').
+ """
+ __slots__ = ['game', 'name', 'abbrev', 'adjust', 'centers', 'units', 'influence', 'homes',
+ 'retreats', 'goner', 'civil_disorder', 'orders', 'role', 'controller', 'vote',
+ 'order_is_set', 'wait', 'tokens']
+ model = {
+ strings.ABBREV: parsing.OptionalValueType(str),
+ strings.ADJUST: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.CENTERS: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.CIVIL_DISORDER: parsing.DefaultValueType(int, 0),
+ strings.CONTROLLER: parsing.DefaultValueType(parsing.DictType(int, str, SortedDict.builder(int, str)), {}),
+ strings.HOMES: parsing.OptionalValueType(parsing.SequenceType(str)),
+ strings.INFLUENCE: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.NAME: parsing.PrimitiveType(str),
+ strings.ORDER_IS_SET: parsing.DefaultValueType(OrderSettings.ALL_SETTINGS, OrderSettings.ORDER_NOT_SET),
+ strings.ORDERS: parsing.DefaultValueType(parsing.DictType(str, str), {}),
+ strings.RETREATS: parsing.DefaultValueType(parsing.DictType(str, parsing.SequenceType(str)), {}),
+ strings.ROLE: parsing.DefaultValueType(str, strings.SERVER_TYPE),
+ strings.TOKENS: parsing.DefaultValueType(parsing.SequenceType(str, set), ()),
+ strings.UNITS: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.VOTE: parsing.DefaultValueType(parsing.EnumerationType(strings.ALL_VOTE_DECISIONS), strings.NEUTRAL),
+ strings.WAIT: parsing.DefaultValueType(bool, True),
+ }
+
+ def __init__(self, game=None, name=None, **kwargs):
+ """ Constructor """
+ self.game = game
+ self.abbrev = None
+ self.adjust, self.centers, self.units, self.influence = [], [], [], []
+ self.homes = None
+ self.retreats = {}
+ self.goner = self.civil_disorder = 0
+ self.orders = {}
+
+ self.name = ''
+ self.role = ''
+ self.controller = SortedDict(int, str)
+ self.vote = ''
+ self.order_is_set = 0
+ self.wait = False
+ self.tokens = set()
+ super(Power, self).__init__(name=name, **kwargs)
+ assert self.role in strings.ALL_ROLE_TYPES or self.role == self.name
+ if not self.controller:
+ self.controller.put(common.timestamp_microseconds(), strings.DUMMY)
+
+ def __str__(self):
+ """ Returns a representation of the power instance """
+ show_cd = self.civil_disorder
+ show_inhabits = self.homes is not None
+ show_owns = self.centers
+ show_retreats = len(self.retreats) > 0
+
+ text = ''
+ text += '\n%s (%s)' % (self.name, self.role)
+ text += '\nPLAYER %s' % self.controller.last_value()
+ text += '\nCD' if show_cd else ''
+ text += '\nINHABITS %s' % ' '.join(self.homes) if show_inhabits else ''
+ text += '\nOWNS %s' % ' '.join(self.centers) if show_owns else ''
+ if show_retreats:
+ text += '\n'.join([''] + [' '.join([unit, '-->'] + places) for unit, places in self.retreats.items()])
+ text = '\n'.join([text] + self.units + self.adjust)
+
+ # Orders - RIO is for REORDER, INVALID, ORDER (in NO_CHECK games)
+ text_order = '\nORDERS\n'
+ for unit, order in self.orders.items():
+ if unit[0] not in 'RIO':
+ text_order += '%s ' % unit
+ text_order += order + '\n'
+
+ text += text_order if self.orders else ''
+ return text
+
+ def __deepcopy__(self, memo):
+ """ Fast deep copy implementation
+ - (Not setting the game object)
+ """
+ cls = self.__class__
+ result = cls.__new__(cls)
+
+ # Deep copying
+ for key in self.__slots__:
+ if key not in ['game']:
+ setattr(result, key, deepcopy(getattr(self, key)))
+
+ # Game
+ setattr(result, 'game', None)
+ return result
+
+ def reinit(self, include_flags=6):
+ """ Performs a reinitialization of some of the parameters
+ :param include_flags: Bit mask to indicate which params to reset
+ (bit 1 = orders, 2 = persistent, 4 = transient)
+ :return: None
+ """
+ reinit_persistent = include_flags & 2
+ reinit_transient = include_flags & 4
+ reinit_orders = include_flags & 1
+
+ # Initialize the persistent parameters
+ if reinit_persistent:
+ self.abbrev = None
+
+ # Initialize the transient parameters
+ if reinit_transient:
+ for home in self.homes:
+ self.game.update_hash(self.name, loc=home, is_home=True)
+ for center in self.centers:
+ self.game.update_hash(self.name, loc=center, is_center=True)
+ for unit in self.units:
+ self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:])
+ for dis_unit in self.retreats:
+ self.game.update_hash(self.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ self.homes = None
+ self.centers, self.units, self.influence = [], [], []
+ self.retreats = {}
+
+ # Initialize the order-related parameters
+ if reinit_orders:
+ self.civil_disorder = 0
+ self.adjust = []
+ self.orders = {}
+ if self.is_eliminated():
+ self.order_is_set = OrderSettings.ORDER_SET_EMPTY
+ self.wait = False
+ else:
+ self.order_is_set = OrderSettings.ORDER_NOT_SET
+ self.wait = True if self.is_dummy() else (not self.game.real_time)
+ self.goner = 0
+
+ @staticmethod
+ def compare(power_1, power_2):
+ """ Comparator object - Compares two Power objects
+ :param power_1: The first Power object to compare
+ :param power_2: The second Power object to compare
+ :return: 1 if self is greater, -1 if other is greater, 0 if they are equal
+ """
+ cmp = lambda power_1, power_2: ((power_1 > power_2) - (power_1 < power_2))
+ xstr = lambda string: string or '' # To avoid comparing with None
+ cmp_type = cmp(xstr(power_1.role), xstr(power_2.role))
+ cmp_name = cmp(xstr(power_1.name), xstr(power_2.name))
+ return cmp_type or cmp_name
+
+ def initialize(self, game):
+ """ Initializes a game and resets home, centers and units
+ :param game: The game to use for initialization
+ :type game: diplomacy.Game
+ """
+
+ # Not initializing observers and monitors
+ assert self.is_server_power()
+
+ self.game = game
+ self.order_is_set = OrderSettings.ORDER_NOT_SET
+ self.wait = True if self.is_dummy() else (not self.game.real_time)
+
+ # Get power abbreviation.
+ self.abbrev = self.game.map.abbrev.get(self.name, self.name[0])
+
+ # Resets homes
+ if self.homes is None:
+ self.homes = []
+ for home in game.map.homes.get(self.name, []):
+ self.game.update_hash(self.name, loc=home, is_home=True)
+ self.homes.append(home)
+
+ # Resets the centers and units
+ if not self.centers:
+ for center in game.map.centers.get(self.name, []):
+ game.update_hash(self.name, loc=center, is_center=True)
+ self.centers.append(center)
+ if not self.units:
+ for unit in game.map.units.get(self.name, []):
+ game.update_hash(self.name, unit_type=unit[0], loc=unit[2:])
+ self.units.append(unit)
+ self.influence.append(unit[2:5])
+
+ def merge(self, other_power):
+ """ Transfer all units, centers, and homes of the other_power to this power
+ :param other_power: The other power (will be empty after the merge)
+ """
+ # Regular units
+ for unit in list(other_power.units):
+ self.units.append(unit)
+ other_power.units.remove(unit)
+ self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:])
+ self.game.update_hash(other_power.name, unit_type=unit[0], loc=unit[2:])
+
+ # Dislodged units
+ for unit in list(other_power.retreats.keys()):
+ self.retreats[unit] = other_power.retreats[unit]
+ del other_power.retreats[unit]
+ self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True)
+ self.game.update_hash(other_power.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True)
+
+ # Influence
+ for loc in list(other_power.influence):
+ self.influence.append(loc)
+ other_power.influence.remove(loc)
+
+ # Supply centers
+ for center in list(other_power.centers):
+ self.centers.append(center)
+ other_power.centers.remove(center)
+ self.game.update_hash(self.name, loc=center, is_center=True)
+ self.game.update_hash(other_power.name, loc=center, is_center=True)
+
+ # Homes
+ for home in list(other_power.homes):
+ self.homes.append(home)
+ other_power.homes.remove(home)
+ self.game.update_hash(self.name, loc=home, is_home=True)
+ self.game.update_hash(other_power.name, loc=home, is_home=True)
+
+ # Clearing state cache
+ self.game.clear_cache()
+
+ def clear_units(self):
+ """ Removes all units from the map """
+ for unit in self.units:
+ self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:])
+ self.units = []
+ self.influence = []
+ self.game.clear_cache()
+
+ def clear_centers(self):
+ """ Removes ownership of all supply centers """
+ for center in self.centers:
+ self.game.update_hash(self.name, loc=center, is_center=True)
+ self.centers = []
+ self.game.clear_cache()
+
+ def is_dummy(self):
+ """ Indicates if the power is a dummy
+ :return: Boolean flag to indicate if the power is a dummy
+ """
+ return self.controller.last_value() == strings.DUMMY
+
+ def is_eliminated(self):
+ """ Returns a flag to show if player is eliminated
+ :return: If the current power is eliminated
+ """
+ # Not eliminated if has units left
+ if self.units or self.centers or self.retreats:
+ return False
+ return True
+
+ def clear_orders(self):
+ """ Clears the power's orders """
+ self.reinit(include_flags=1)
+
+ def moves_submitted(self):
+ """ Returns a boolean to indicate if moves has been submitted
+ :return: 1 if not in Movement phase, or orders submitted, or no more units lefts
+ """
+ if self.game.phase_type != 'M':
+ return 1
+ return self.orders or not self.units
+
+ # ==============================================================
+ # Application/network methods (mainly used for connected games).
+ # ==============================================================
+
+ def is_observer_power(self):
+ """ Return True if this power is an observer power. """
+ return self.role == strings.OBSERVER_TYPE
+
+ def is_omniscient_power(self):
+ """ Return True if this power is an omniscient power. """
+ return self.role == strings.OMNISCIENT_TYPE
+
+ def is_player_power(self):
+ """ Return True if this power is a player power. """
+ return self.role == self.name
+
+ def is_server_power(self):
+ """ Return True if this power is a server power. """
+ return self.role == strings.SERVER_TYPE
+
+ def is_controlled(self):
+ """ Return True if this power is controlled. """
+ return self.controller.last_value() != strings.DUMMY
+
+ def does_not_wait(self):
+ """ Return True if this power does not wait (ie. if we could already process orders of this power). """
+ return self.order_is_set and not self.wait
+
+ def update_controller(self, username, timestamp):
+ """ Update controller with given username and timestamp. """
+ self.controller.put(timestamp, username)
+
+ def set_controlled(self, username):
+ """ Control power with given username. Username may be None (meaning no controller). """
+ if username is None or username == strings.DUMMY:
+ if self.controller.last_value() != strings.DUMMY:
+ self.controller.put(common.timestamp_microseconds(), strings.DUMMY)
+ self.tokens.clear()
+ self.wait = True
+ self.vote = strings.NEUTRAL
+ elif self.controller.last_value() == strings.DUMMY:
+ self.controller.put(common.timestamp_microseconds(), username)
+ self.wait = not self.game.real_time
+ elif self.controller.last_value() != username:
+ raise DiplomacyException('Power already controlled by someone else. Kick previous controller before.')
+
+ def get_controller(self):
+ """ Return current power controller name ('dummy' if power is not controlled). """
+ return self.controller.last_value()
+
+ def get_controller_timestamp(self):
+ """ Return timestamp when current controller took control of this power. """
+ return self.controller.last_key()
+
+ def is_controlled_by(self, username):
+ """ Return True if this power is controlled by given username. """
+ if username == constants.PRIVATE_BOT_USERNAME:
+ # Bot is connected if power is dummy and has some associated tokens.
+ return self.is_dummy() and bool(self.tokens)
+ return self.controller.last_value() == username
+
+ # Server-only methods.
+
+ def has_token(self, token):
+ """ Return True if this power has given token. """
+ assert self.is_server_power()
+ return token in self.tokens
+
+ def add_token(self, token):
+ """ Add given token to this power. """
+ assert self.is_server_power()
+ self.tokens.add(token)
+
+ def remove_tokens(self, tokens):
+ """ Remove sequence of tokens from this power. """
+ assert self.is_server_power()
+ self.tokens.difference_update(tokens)
diff --git a/diplomacy/engine/renderer.py b/diplomacy/engine/renderer.py
new file mode 100644
index 0000000..8b5e10c
--- /dev/null
+++ b/diplomacy/engine/renderer.py
@@ -0,0 +1,789 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+# -*- coding: utf-8 -*-
+""" Renderer
+ - Contains the renderer object which is responsible for rendering a game state to svg
+"""
+import os
+from xml.dom import minidom
+from diplomacy import settings
+
+# Constants
+LAYER_SC = 'SupplyCenterLayer'
+LAYER_ORDER = 'OrderLayer'
+LAYER_UNIT = 'UnitLayer'
+LAYER_DISL = 'DislodgedUnitLayer'
+ARMY = 'Army'
+FLEET = 'Fleet'
+
+def _attr(node_element, attr_name):
+ """ Shorthand method to retrieve an XML attribute """
+ return node_element.attributes[attr_name].value
+
+def _offset(str_float, offset):
+ """ Shorthand to add a offset to an attribute """
+ return str(float(str_float) + offset)
+
+
+class Renderer():
+ """ Renderer object responsible for rendering a game state to svg """
+
+ def __init__(self, game):
+ """ Constructor
+ :param game: The instantiated game object to render
+ :type game: diplomacy.Game
+ """
+ self.game = game
+ self.metadata = {}
+ self.xml_map = None
+ self.xml_map_path = os.path.join(settings.PACKAGE_DIR, 'maps', 'svg', self.game.map.name + '.svg')
+
+ # Loading XML
+ if os.path.exists(self.xml_map_path):
+ self.xml_map = minidom.parse(self.xml_map_path).toxml()
+ self._load_metadata()
+
+ def norm_order(self, order):
+ """ Normalizes the order format and split it into tokens
+ This is only used for **movement** orders (to make sure NO_CHECK games used the correct format)
+ Formats:
+ A PAR H
+ A PAR - BUR [VIA]
+ A PAR S BUR
+ A PAR S F BRE - PIC
+ F BRE C A PAR - LON
+
+ :param order: The unformatted order (e.g. 'Paris - Burgundy')
+ :return: The tokens of the formatted order (e.g. ['A', 'PAR', '-', 'BUR'])
+ """
+ return self.game._add_unit_types(self.game._expand_order(order.split())) # pylint: disable=protected-access
+
+ def render(self, incl_orders=True, incl_abbrev=False, output_format='svg'):
+ """ Renders the current game and returns the XML representation
+ :param incl_orders: Optional. Flag to indicate we also want to render orders.
+ :param incl_abbrev: Optional. Flag to indicate we also want to display the provinces abbreviations.
+ :param output_format: The desired output format.
+ :return: The rendered image in the specified format.
+ """
+ # pylint: disable=too-many-branches
+ if output_format not in ['svg']:
+ raise ValueError('Only "svg" format is current supported.')
+ if not self.game or not self.game.map or not self.xml_map:
+ return None
+
+ # Parsing XML
+ xml_map = minidom.parseString(self.xml_map)
+ scs = self.game.map.scs[:]
+
+ # Setting phase and note
+ nb_centers = [(power.name[:3], len(power.centers))
+ for power in self.game.powers.values()
+ if not power.is_eliminated()]
+ nb_centers = sorted(nb_centers, key=lambda key: key[1], reverse=True)
+ nb_centers_per_power = ' '.join(['{}: {}'.format(name, centers) for name, centers in nb_centers])
+ xml_map = self._set_current_phase(xml_map, self.game.get_current_phase())
+ xml_map = self._set_note(xml_map, nb_centers_per_power, self.game.note)
+
+ # Adding units, supply centers, and influence
+ for power in self.game.powers.values():
+ for unit in power.units:
+ xml_map = self._add_unit(xml_map, unit, power.name, is_dislodged=False)
+ for unit in power.retreats:
+ xml_map = self._add_unit(xml_map, unit, power.name, is_dislodged=True)
+ for center in power.centers:
+ xml_map = self._add_supply_center(xml_map, center, power.name)
+ xml_map = self._set_influence(xml_map, center, power.name, has_supply_center=True)
+ scs.remove(center)
+ for loc in power.influence:
+ xml_map = self._set_influence(xml_map, loc, power.name, has_supply_center=False)
+
+ # Orders
+ if incl_orders:
+
+ # Regular orders (Normalized)
+ # A PAR H
+ # A PAR - BUR [VIA]
+ # A PAR S BUR
+ # A PAR S F BRE - PIC
+ # F BRE C A PAR - LON
+ for order_key in power.orders:
+
+ # No_check order (Order, Invalid, Reorder)
+ # Otherwise regular order (unit is key, order without unit is value)
+ if order_key[0] in 'RIO':
+ order = power.orders[order_key]
+ else:
+ order = '{} {}'.format(order_key, power.orders[order_key])
+
+ # Normalizing and splitting in tokens
+ tokens = self.norm_order(order)
+ unit_loc = tokens[1]
+
+ # Parsing based on order type
+ if not tokens or len(tokens) < 3:
+ continue
+ elif tokens[2] == 'H':
+ xml_map = self._issue_hold_order(xml_map, unit_loc, power.name)
+ elif tokens[2] == '-':
+ dest_loc = tokens[-1] if tokens[-1] != 'VIA' else tokens[-2]
+ xml_map = self._issue_move_order(xml_map, unit_loc, dest_loc, power.name)
+ elif tokens[2] == 'S':
+ dest_loc = tokens[-1]
+ if '-' in tokens:
+ src_loc = tokens[4] if tokens[3] == 'A' or tokens[3] == 'F' else tokens[3]
+ xml_map = self._issue_support_move_order(xml_map, unit_loc, src_loc, dest_loc, power.name)
+ else:
+ xml_map = self._issue_support_hold_order(xml_map, unit_loc, dest_loc, power.name)
+ elif tokens[2] == 'C':
+ src_loc = tokens[4] if tokens[3] == 'A' or tokens[3] == 'F' else tokens[3]
+ dest_loc = tokens[-1]
+ if src_loc != dest_loc and '-' in tokens:
+ xml_map = self._issue_convoy_order(xml_map, unit_loc, src_loc, dest_loc, power.name)
+ else:
+ raise RuntimeError('Unknown order: {}'.format(' '.join(tokens)))
+
+ # Adjustment orders
+ # VOID xxx
+ # A PAR B
+ # A PAR D
+ # A PAR R BUR
+ # WAIVE
+ for order in power.adjust:
+ tokens = order.split()
+ if not tokens or tokens[0] == 'VOID' or tokens[-1] == 'WAIVE':
+ continue
+ elif tokens[-1] == 'B':
+ if len(tokens) < 3:
+ continue
+ xml_map = self._issue_build_order(xml_map, tokens[0], tokens[1], power.name)
+ elif tokens[-1] == 'D':
+ xml_map = self._issue_disband_order(xml_map, tokens[1])
+ elif tokens[-2] == 'R':
+ src_loc = tokens[1] if tokens[0] == 'A' or tokens[0] == 'F' else tokens[0]
+ dest_loc = tokens[-1]
+ xml_map = self._issue_move_order(xml_map, src_loc, dest_loc, power.name)
+ else:
+ raise RuntimeError('Unknown order: {}'.format(order))
+
+ # Adding remaining supply centers
+ for center in scs:
+ xml_map = self._add_supply_center(xml_map, center, None)
+
+ # Removing abbrev and mouse layer
+ svg_node = xml_map.getElementsByTagName('svg')[0]
+ for child_node in svg_node.childNodes:
+ if child_node.nodeName != 'g':
+ continue
+ if _attr(child_node, 'id') == 'BriefLabelLayer' and not incl_abbrev:
+ svg_node.removeChild(child_node)
+ elif _attr(child_node, 'id') == 'MouseLayer':
+ svg_node.removeChild(child_node)
+
+ # Returning
+ return xml_map.toxml()
+
+ def _load_metadata(self):
+ """ Loads meta-data embedded in the XML map and clears unused nodes """
+ if not self.xml_map:
+ return
+ xml_map = minidom.parseString(self.xml_map)
+
+ # Data
+ self.metadata = {
+ 'color': {},
+ 'symbol_size': {},
+ 'orders': {},
+ 'coord': {}
+ }
+
+ # Order drawings
+ for order_drawing in xml_map.getElementsByTagName('jdipNS:ORDERDRAWING'):
+ for child_node in order_drawing.childNodes:
+
+ # Power Colors
+ if child_node.nodeName == 'jdipNS:POWERCOLORS':
+ for power_color in child_node.childNodes:
+ if power_color.nodeName == 'jdipNS:POWERCOLOR':
+ self.metadata['color'][_attr(power_color, 'power').upper()] = _attr(power_color, 'color')
+
+ # Symbol size
+ elif child_node.nodeName == 'jdipNS:SYMBOLSIZE':
+ self.metadata['symbol_size'][_attr(child_node, 'name')] = (_attr(child_node, 'height'),
+ _attr(child_node, 'width'))
+
+ # Order type
+ elif child_node.nodeName.startswith('jdipNS'):
+ order_type = child_node.nodeName.replace('jdipNS:', '')
+ self.metadata['orders'][order_type] = {}
+ for attr_name, attr_value in child_node.attributes.items():
+ if ':' in attr_name:
+ continue
+ self.metadata['orders'][order_type][attr_name] = attr_value
+
+ # Object coordinates
+ for province_data in xml_map.getElementsByTagName('jdipNS:PROVINCE_DATA'):
+ for child_node in province_data.childNodes:
+
+ # Province
+ if child_node.nodeName == 'jdipNS:PROVINCE':
+ province = _attr(child_node, 'name').upper().replace('-', '/')
+ self.metadata['coord'][province] = {}
+
+ for coord_node in child_node.childNodes:
+ if coord_node.nodeName == 'jdipNS:UNIT':
+ self.metadata['coord'][province]['unit'] = (_attr(coord_node, 'x'), _attr(coord_node, 'y'))
+ elif coord_node.nodeName == 'jdipNS:DISLODGED_UNIT':
+ self.metadata['coord'][province]['disl'] = (_attr(coord_node, 'x'), _attr(coord_node, 'y'))
+ elif coord_node.nodeName == 'jdipNS:SUPPLY_CENTER':
+ self.metadata['coord'][province]['sc'] = (_attr(coord_node, 'x'), _attr(coord_node, 'y'))
+
+ # Deleting
+ svg_node = xml_map.getElementsByTagName('svg')[0]
+ svg_node.removeChild(xml_map.getElementsByTagName('jdipNS:DISPLAY')[0])
+ svg_node.removeChild(xml_map.getElementsByTagName('jdipNS:ORDERDRAWING')[0])
+ svg_node.removeChild(xml_map.getElementsByTagName('jdipNS:PROVINCE_DATA')[0])
+ self.xml_map = xml_map.toxml()
+
+ def _add_unit(self, xml_map, unit, power_name, is_dislodged):
+ """ Adds a unit to the map
+ :param xml_map: The xml map being generated
+ :param unit: The unit to add (e.g. 'A PAR')
+ :param power_name: The name of the power owning the unit (e.g. 'FRANCE')
+ :param is_dislodged: Boolean. Indicates if the unit is dislodged
+ :return: Nothing
+ """
+ unit_type, loc = unit.split()
+ symbol = FLEET if unit_type == 'F' else ARMY
+ loc_x = _offset(self.metadata['coord'][loc][('unit', 'disl')[is_dislodged]][0], -11.5)
+ loc_y = _offset(self.metadata['coord'][loc][('unit', 'disl')[is_dislodged]][1], - 10.)
+ node = xml_map.createElement('use')
+ node.setAttribute('x', loc_x)
+ node.setAttribute('y', loc_y)
+ node.setAttribute('height', self.metadata['symbol_size'][symbol][0])
+ node.setAttribute('width', self.metadata['symbol_size'][symbol][1])
+ node.setAttribute('xlink:href', '#{}{}'.format(('', 'Dislodged')[is_dislodged], symbol))
+ node.setAttribute('class', 'unit{}'.format(power_name.lower()))
+
+ # Inserting
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' \
+ and _attr(child_node, 'id') == ['UnitLayer', 'DislodgedUnitLayer'][is_dislodged]:
+ child_node.appendChild(node)
+ break
+ return xml_map
+
+ def _add_supply_center(self, xml_map, loc, power_name):
+ """ Adds a supply center to the map
+ :param xml_map: The xml map being generated
+ :param loc: The province where to add the SC (e.g. 'PAR')
+ :param power_name: The name of the power owning the SC or None
+ :return: Nothing
+ """
+ symbol = 'SupplyCenter'
+ loc_x = _offset(self.metadata['coord'][loc]['sc'][0], -8.5)
+ loc_y = _offset(self.metadata['coord'][loc]['sc'][1], -11.)
+ node = xml_map.createElement('use')
+ node.setAttribute('x', loc_x)
+ node.setAttribute('y', loc_y)
+ node.setAttribute('height', self.metadata['symbol_size'][symbol][0])
+ node.setAttribute('width', self.metadata['symbol_size'][symbol][1])
+ node.setAttribute('xlink:href', '#{}'.format(symbol))
+ if power_name:
+ node.setAttribute('class', 'sc{}'.format(power_name.lower()))
+ else:
+ node.setAttribute('class', 'scnopower')
+
+ # Inserting
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'SupplyCenterLayer':
+ child_node.appendChild(node)
+ break
+ return xml_map
+
+ def _set_influence(self, xml_map, loc, power_name, has_supply_center=False):
+ """ Sets the influence on the map
+ :param xml_map: The xml map being generated
+ :param loc: The province being influenced (e.g. 'PAR')
+ :param power_name: The name of the power influencing the province
+ :param has_supply_center: Boolean flag to acknowledge we are modifying a loc with a SC
+ :return: Nothing
+ """
+ loc = loc.upper()[:3]
+ if loc in self.game.map.scs and not has_supply_center:
+ return xml_map
+ if self.game.map.area_type(loc) not in ['LAND', 'COAST']:
+ return xml_map
+
+ # Inserting
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'MapLayer':
+ for map_node in child_node.childNodes:
+ if map_node.nodeName == 'path' and _attr(map_node, 'id') == '_{}'.format(loc.lower()):
+ if power_name:
+ map_node.setAttribute('class', power_name.lower())
+ else:
+ map_node.setAttribute('class', 'nopower')
+ return xml_map
+
+ # Returning
+ return xml_map
+
+ @staticmethod
+ def _set_current_phase(xml_map, current_phase):
+ """ Sets the phase text at the bottom right of the the map
+ :param xml_map: The xml map being generated
+ :param current_phase: The current phase (e.g. 'S1901M)
+ :return: Nothing
+ """
+ current_phase = 'FINAL' if current_phase[0] == '?' else current_phase
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'text' and _attr(child_node, 'id') == 'CurrentPhase':
+ child_node.childNodes[0].nodeValue = current_phase
+ return xml_map
+ return xml_map
+
+ @staticmethod
+ def _set_note(xml_map, note_1, note_2):
+ """ Sets a note at the top left of the map
+ :param xml_map: The xml map being generated
+ :param note_1: The text to display on the first line
+ :param note_2: The text to display on the second line
+ :return: Nothing
+ """
+ note_1 = note_1 or ' '
+ note_2 = note_2 or ' '
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'text' and _attr(child_node, 'id') == 'CurrentNote':
+ child_node.childNodes[0].nodeValue = note_1
+ if child_node.nodeName == 'text' and _attr(child_node, 'id') == 'CurrentNote2':
+ child_node.childNodes[0].nodeValue = note_2
+ return xml_map
+
+ def _issue_hold_order(self, xml_map, loc, power_name):
+ """ Adds a hold order to the map
+ :param xml_map: The xml map being generated
+ :param loc: The province where the unit is holding (e.g. 'PAR')
+ :param power_name: The name of the power owning the unit
+ :return: Nothing
+ """
+ # Calculating polygon coord
+ polygon_coord = []
+ loc_x = _offset(self.metadata['coord'][loc]['unit'][0], 8.5)
+ loc_y = _offset(self.metadata['coord'][loc]['unit'][1], 9.5)
+ for offset in [(13.8, -33.3), (33.3, -13.8), (33.3, 13.8), (13.8, 33.3), (-13.8, 33.3), (-33.3, 13.8),
+ (-33.3, -13.8), (-13.8, -33.3)]:
+ polygon_coord += [_offset(loc_x, offset[0]) + ',' + _offset(loc_y, offset[1])]
+
+ # Building polygon
+ g_node = xml_map.createElement('g')
+
+ poly_1 = xml_map.createElement('polygon')
+ poly_1.setAttribute('stroke-width', '10')
+ poly_1.setAttribute('class', 'varwidthshadow')
+ poly_1.setAttribute('points', ' '.join(polygon_coord))
+
+ poly_2 = xml_map.createElement('polygon')
+ poly_2.setAttribute('stroke-width', '6')
+ poly_2.setAttribute('class', 'varwidthorder')
+ poly_2.setAttribute('points', ' '.join(polygon_coord))
+ poly_2.setAttribute('stroke', self.metadata['color'][power_name])
+
+ g_node.appendChild(poly_1)
+ g_node.appendChild(poly_2)
+
+ # Inserting
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer':
+ for layer_node in child_node.childNodes:
+ if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer1':
+ layer_node.appendChild(g_node)
+ return xml_map
+
+ # Returning
+ return xml_map
+
+ def _issue_support_hold_order(self, xml_map, loc, dest_loc, power_name):
+ """ Issues a support hold order
+ :param xml_map: The xml map being generated
+ :param loc: The location of the unit sending support (e.g. 'BER')
+ :param dest_loc: The location where the unit is holding from (e.g. 'PAR')
+ :param power_name: The power name issuing the move order
+ :return: Nothing
+ """
+ loc_x = _offset(self.metadata['coord'][loc]['unit'][0], 10)
+ loc_y = _offset(self.metadata['coord'][loc]['unit'][1], 10)
+ dest_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 10)
+ dest_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 10)
+
+ # Adjusting destination
+ delta_x = float(dest_loc_x) - float(loc_x)
+ delta_y = float(dest_loc_y) - float(loc_y)
+ vector_length = (delta_x ** 2. + delta_y ** 2.) ** 0.5
+ dest_loc_x = str(round(float(loc_x) + (vector_length - 35.) / vector_length * delta_x, 2))
+ dest_loc_y = str(round(float(loc_y) + (vector_length - 35.) / vector_length * delta_y, 2))
+
+ # Getting polygon coordinates
+ polygon_coord = []
+ poly_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 8.5)
+ poly_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 9.5)
+ for offset in [(15.9, -38.3), (38.3, -15.9), (38.3, 15.9), (15.9, 38.3), (-15.9, 38.3), (-38.3, 15.9),
+ (-38.3, -15.9), (-15.9, -38.3)]:
+ polygon_coord += [_offset(poly_loc_x, offset[0]) + ',' + _offset(poly_loc_y, offset[1])]
+
+ # Creating nodes
+ g_node = xml_map.createElement('g')
+
+ shadow_line = xml_map.createElement('line')
+ shadow_line.setAttribute('x1', loc_x)
+ shadow_line.setAttribute('y1', loc_y)
+ shadow_line.setAttribute('x2', dest_loc_x)
+ shadow_line.setAttribute('y2', dest_loc_y)
+ shadow_line.setAttribute('class', 'shadowdash')
+
+ support_line = xml_map.createElement('line')
+ support_line.setAttribute('x1', loc_x)
+ support_line.setAttribute('y1', loc_y)
+ support_line.setAttribute('x2', dest_loc_x)
+ support_line.setAttribute('y2', dest_loc_y)
+ support_line.setAttribute('class', 'supportorder')
+ support_line.setAttribute('stroke', self.metadata['color'][power_name])
+
+ shadow_poly = xml_map.createElement('polygon')
+ shadow_poly.setAttribute('class', 'shadowdash')
+ shadow_poly.setAttribute('points', ' '.join(polygon_coord))
+
+ support_poly = xml_map.createElement('polygon')
+ support_poly.setAttribute('class', 'supportorder')
+ support_poly.setAttribute('points', ' '.join(polygon_coord))
+ support_poly.setAttribute('stroke', self.metadata['color'][power_name])
+
+ # Inserting
+ g_node.appendChild(shadow_line)
+ g_node.appendChild(support_line)
+ g_node.appendChild(shadow_poly)
+ g_node.appendChild(support_poly)
+
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer':
+ for layer_node in child_node.childNodes:
+ if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer2':
+ layer_node.appendChild(g_node)
+ return xml_map
+
+ # Returning
+ return xml_map
+
+ def _issue_move_order(self, xml_map, src_loc, dest_loc, power_name):
+ """ Issues a move order
+ :param xml_map: The xml map being generated
+ :param src_loc: The location where the unit is moving from (e.g. 'PAR')
+ :param dest_loc: The location where the unit is moving to (e.g. 'MAR')
+ :param power_name: The power name issuing the move order
+ :return: Nothing
+ """
+ if self.game.get_current_phase()[-1] == 'R':
+ src_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], -2.5)
+ src_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], -2.5)
+ else:
+ src_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], 10)
+ src_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], 10)
+ dest_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 10)
+ dest_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 10)
+
+ # Adjusting destination
+ delta_x = float(dest_loc_x) - float(src_loc_x)
+ delta_y = float(dest_loc_y) - float(src_loc_y)
+ vector_length = (delta_x ** 2. + delta_y ** 2.) ** 0.5
+ dest_loc_x = str(round(float(src_loc_x) + (vector_length - 30.) / vector_length * delta_x, 2))
+ dest_loc_y = str(round(float(src_loc_y) + (vector_length - 30.) / vector_length * delta_y, 2))
+
+ # Creating nodes
+ g_node = xml_map.createElement('g')
+
+ line_with_shadow = xml_map.createElement('line')
+ line_with_shadow.setAttribute('x1', src_loc_x)
+ line_with_shadow.setAttribute('y1', src_loc_y)
+ line_with_shadow.setAttribute('x2', dest_loc_x)
+ line_with_shadow.setAttribute('y2', dest_loc_y)
+ line_with_shadow.setAttribute('class', 'varwidthshadow')
+ line_with_shadow.setAttribute('stroke-width', '10')
+
+ line_with_arrow = xml_map.createElement('line')
+ line_with_arrow.setAttribute('x1', src_loc_x)
+ line_with_arrow.setAttribute('y1', src_loc_y)
+ line_with_arrow.setAttribute('x2', dest_loc_x)
+ line_with_arrow.setAttribute('y2', dest_loc_y)
+ line_with_arrow.setAttribute('class', 'varwidthorder')
+ line_with_arrow.setAttribute('stroke', self.metadata['color'][power_name])
+ line_with_arrow.setAttribute('stroke-width', '6')
+ line_with_arrow.setAttribute('marker-end', 'url(#arrow)')
+
+ # Inserting
+ g_node.appendChild(line_with_shadow)
+ g_node.appendChild(line_with_arrow)
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer':
+ for layer_node in child_node.childNodes:
+ if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer1':
+ layer_node.appendChild(g_node)
+ return xml_map
+
+ # Returning
+ return xml_map
+
+ def _issue_support_move_order(self, xml_map, loc, src_loc, dest_loc, power_name):
+ """ Issues a support move order
+ :param xml_map: The xml map being generated
+ :param loc: The location of the unit sending support (e.g. 'BER')
+ :param src_loc: The location where the unit is moving from (e.g. 'PAR')
+ :param dest_loc: The location where the unit is moving to (e.g. 'MAR')
+ :param power_name: The power name issuing the move order
+ :return: Nothing
+ """
+ loc_x = _offset(self.metadata['coord'][loc]['unit'][0], 10)
+ loc_y = _offset(self.metadata['coord'][loc]['unit'][1], 10)
+ src_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], 10)
+ src_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], 10)
+ dest_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 10)
+ dest_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 10)
+
+ # Adjusting destination
+ delta_x = float(dest_loc_x) - float(src_loc_x)
+ delta_y = float(dest_loc_y) - float(src_loc_y)
+ vector_length = (delta_x ** 2. + delta_y ** 2.) ** 0.5
+ dest_loc_x = str(round(float(src_loc_x) + (vector_length - 30.) / vector_length * delta_x, 2))
+ dest_loc_y = str(round(float(src_loc_y) + (vector_length - 30.) / vector_length * delta_y, 2))
+
+ # Creating nodes
+ g_node = xml_map.createElement('g')
+
+ path_with_shadow = xml_map.createElement('path')
+ path_with_shadow.setAttribute('class', 'shadowdash')
+ path_with_shadow.setAttribute('d', 'M {},{} C {},{} {},{} {},{}'.format(loc_x, loc_y,
+ src_loc_x, src_loc_y,
+ src_loc_x, src_loc_y,
+ dest_loc_x, dest_loc_y))
+
+ path_with_arrow = xml_map.createElement('path')
+ path_with_arrow.setAttribute('class', 'supportorder')
+ path_with_arrow.setAttribute('stroke', self.metadata['color'][power_name])
+ path_with_arrow.setAttribute('marker-end', 'url(#arrow)')
+ path_with_arrow.setAttribute('d', 'M {},{} C {},{} {},{} {},{}'.format(loc_x, loc_y,
+ src_loc_x, src_loc_y,
+ src_loc_x, src_loc_y,
+ dest_loc_x, dest_loc_y))
+
+ # Inserting
+ g_node.appendChild(path_with_shadow)
+ g_node.appendChild(path_with_arrow)
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer':
+ for layer_node in child_node.childNodes:
+ if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer2':
+ layer_node.appendChild(g_node)
+ return xml_map
+
+ # Returning
+ return xml_map
+
+ def _issue_convoy_order(self, xml_map, loc, src_loc, dest_loc, power_name):
+ """ Issues a convoy order
+ :param xml_map: The xml map being generated
+ :param loc: The location of the unit convoying (e.g. 'BER')
+ :param src_loc: The location where the unit being convoyed is moving from (e.g. 'PAR')
+ :param dest_loc: The location where the unit being convoyed is moving to (e.g. 'MAR')
+ :param power_name: The power name issuing the convoy order
+ :return: Nothing
+ """
+ loc_x = _offset(self.metadata['coord'][loc]['unit'][0], 10)
+ loc_y = _offset(self.metadata['coord'][loc]['unit'][1], 10)
+ src_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], 10)
+ src_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], 10)
+ dest_loc_x = _offset(self.metadata['coord'][dest_loc]['unit'][0], 10)
+ dest_loc_y = _offset(self.metadata['coord'][dest_loc]['unit'][1], 10)
+
+ # Adjusting starting arrow (from convoy to start location)
+ # This is to avoid the end of the arrow conflicting with the convoy triangle
+ src_delta_x = float(src_loc_x) - float(loc_x)
+ src_delta_y = float(src_loc_y) - float(loc_y)
+ src_vector_length = (src_delta_x ** 2. + src_delta_y ** 2.) ** 0.5
+ src_loc_x_1 = str(round(float(loc_x) + (src_vector_length - 30.) / src_vector_length * src_delta_x, 2))
+ src_loc_y_1 = str(round(float(loc_y) + (src_vector_length - 30.) / src_vector_length * src_delta_y, 2))
+
+ # Adjusting destination arrow (from start location to destination location)
+ # This is to avoid the start of the arrow conflicting with the convoy triangle
+ dest_delta_x = float(src_loc_x) - float(dest_loc_x)
+ dest_delta_y = float(src_loc_y) - float(dest_loc_y)
+ dest_vector_length = (dest_delta_x ** 2. + dest_delta_y ** 2.) ** 0.5
+ src_loc_x_2 = str(round(float(dest_loc_x) + (dest_vector_length - 30.) / dest_vector_length * dest_delta_x, 2))
+ src_loc_y_2 = str(round(float(dest_loc_y) + (dest_vector_length - 30.) / dest_vector_length * dest_delta_y, 2))
+
+ # Adjusting destination arrow (from start location to destination location)
+ # This is to avoid the start of the arrow conflicting with the convoy triangle
+ dest_delta_x = float(dest_loc_x) - float(src_loc_x)
+ dest_delta_y = float(dest_loc_y) - float(src_loc_y)
+ dest_vector_length = (dest_delta_x ** 2. + dest_delta_y ** 2.) ** 0.5
+ dest_loc_x = str(round(float(src_loc_x) + (dest_vector_length - 30.) / dest_vector_length * dest_delta_x, 2))
+ dest_loc_y = str(round(float(src_loc_y) + (dest_vector_length - 30.) / dest_vector_length * dest_delta_y, 2))
+
+ # Getting convoy triangle coordinates
+ triangle_coord = []
+ triangle_loc_x = _offset(self.metadata['coord'][src_loc]['unit'][0], 10)
+ triangle_loc_y = _offset(self.metadata['coord'][src_loc]['unit'][1], 10)
+ for offset in [(0, -38.3), (33.2, 19.1), (-33.2, 19.1)]:
+ triangle_coord += [_offset(triangle_loc_x, offset[0]) + ',' + _offset(triangle_loc_y, offset[1])]
+
+ # Creating nodes
+ g_node = xml_map.createElement('g')
+
+ src_shadow_line = xml_map.createElement('line')
+ src_shadow_line.setAttribute('x1', loc_x)
+ src_shadow_line.setAttribute('y1', loc_y)
+ src_shadow_line.setAttribute('x2', src_loc_x_1)
+ src_shadow_line.setAttribute('y2', src_loc_y_1)
+ src_shadow_line.setAttribute('class', 'shadowdash')
+
+ dest_shadow_line = xml_map.createElement('line')
+ dest_shadow_line.setAttribute('x1', src_loc_x_2)
+ dest_shadow_line.setAttribute('y1', src_loc_y_2)
+ dest_shadow_line.setAttribute('x2', dest_loc_x)
+ dest_shadow_line.setAttribute('y2', dest_loc_y)
+ dest_shadow_line.setAttribute('class', 'shadowdash')
+
+ src_convoy_line = xml_map.createElement('line')
+ src_convoy_line.setAttribute('x1', loc_x)
+ src_convoy_line.setAttribute('y1', loc_y)
+ src_convoy_line.setAttribute('x2', src_loc_x_1)
+ src_convoy_line.setAttribute('y2', src_loc_y_1)
+ src_convoy_line.setAttribute('class', 'convoyorder')
+ src_convoy_line.setAttribute('stroke', self.metadata['color'][power_name])
+
+ dest_convoy_line = xml_map.createElement('line')
+ dest_convoy_line.setAttribute('x1', src_loc_x_2)
+ dest_convoy_line.setAttribute('y1', src_loc_y_2)
+ dest_convoy_line.setAttribute('x2', dest_loc_x)
+ dest_convoy_line.setAttribute('y2', dest_loc_y)
+ dest_convoy_line.setAttribute('class', 'convoyorder')
+ dest_convoy_line.setAttribute('stroke', self.metadata['color'][power_name])
+ dest_convoy_line.setAttribute('marker-end', 'url(#arrow)')
+
+ shadow_poly = xml_map.createElement('polygon')
+ shadow_poly.setAttribute('class', 'shadowdash')
+ shadow_poly.setAttribute('points', ' '.join(triangle_coord))
+
+ convoy_poly = xml_map.createElement('polygon')
+ convoy_poly.setAttribute('class', 'convoyorder')
+ convoy_poly.setAttribute('points', ' '.join(triangle_coord))
+ convoy_poly.setAttribute('stroke', self.metadata['color'][power_name])
+
+ # Inserting
+ g_node.appendChild(src_shadow_line)
+ g_node.appendChild(dest_shadow_line)
+ g_node.appendChild(src_convoy_line)
+ g_node.appendChild(dest_convoy_line)
+ g_node.appendChild(shadow_poly)
+ g_node.appendChild(convoy_poly)
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'OrderLayer':
+ for layer_node in child_node.childNodes:
+ if layer_node.nodeName == 'g' and _attr(layer_node, 'id') == 'Layer2':
+ layer_node.appendChild(g_node)
+ return xml_map
+
+ # Returning
+ return xml_map
+
+ def _issue_build_order(self, xml_map, unit_type, loc, power_name):
+ """ Adds a build army/fleet order to the map
+ :param xml_map: The xml map being generated
+ :param unit_type: The unit type to build ('A' or 'F')
+ :param loc: The province where the army is to be built (e.g. 'PAR')
+ :param power_name: The name of the power building the unit
+ :return: Nothing
+ """
+ loc_x = _offset(self.metadata['coord'][loc]['unit'][0], -11.5)
+ loc_y = _offset(self.metadata['coord'][loc]['unit'][1], - 10.)
+ build_loc_x = _offset(self.metadata['coord'][loc]['unit'][0], -20.5)
+ build_loc_y = _offset(self.metadata['coord'][loc]['unit'][1], -20.5)
+
+ # Symbols
+ symbol = ARMY if unit_type == 'A' else FLEET
+ build_symbol = 'BuildUnit'
+
+ # Creating nodes
+ g_node = xml_map.createElement('g')
+
+ symbol_node = xml_map.createElement('use')
+ symbol_node.setAttribute('x', loc_x)
+ symbol_node.setAttribute('y', loc_y)
+ symbol_node.setAttribute('height', self.metadata['symbol_size'][symbol][0])
+ symbol_node.setAttribute('width', self.metadata['symbol_size'][symbol][1])
+ symbol_node.setAttribute('xlink:href', '#{}'.format(symbol))
+ symbol_node.setAttribute('class', 'unit{}'.format(power_name.lower()))
+
+ build_node = xml_map.createElement('use')
+ build_node.setAttribute('x', build_loc_x)
+ build_node.setAttribute('y', build_loc_y)
+ build_node.setAttribute('height', self.metadata['symbol_size'][build_symbol][0])
+ build_node.setAttribute('width', self.metadata['symbol_size'][build_symbol][1])
+ build_node.setAttribute('xlink:href', '#{}'.format(build_symbol))
+
+ # Inserting
+ g_node.appendChild(build_node)
+ g_node.appendChild(symbol_node)
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'HighestOrderLayer':
+ child_node.appendChild(g_node)
+ return xml_map
+
+ # Returning
+ return xml_map
+
+ def _issue_disband_order(self, xml_map, loc):
+ """ Adds a disband order to the map
+ :param xml_map: The xml map being generated
+ :param loc: The province where the unit is disbanded (e.g. 'PAR')
+ :return: Nothing
+ """
+ if self.game.get_current_phase()[-1] == 'R':
+ loc_x = _offset(self.metadata['coord'][loc]['unit'][0], -29.)
+ loc_y = _offset(self.metadata['coord'][loc]['unit'][1], -27.5)
+ else:
+ loc_x = _offset(self.metadata['coord'][loc]['unit'][0], -16.5)
+ loc_y = _offset(self.metadata['coord'][loc]['unit'][1], -15.)
+
+ # Symbols
+ symbol = 'RemoveUnit'
+
+ # Creating nodes
+ g_node = xml_map.createElement('g')
+ symbol_node = xml_map.createElement('use')
+ symbol_node.setAttribute('x', loc_x)
+ symbol_node.setAttribute('y', loc_y)
+ symbol_node.setAttribute('height', self.metadata['symbol_size'][symbol][0])
+ symbol_node.setAttribute('width', self.metadata['symbol_size'][symbol][1])
+ symbol_node.setAttribute('xlink:href', '#{}'.format(symbol))
+
+ # Inserting
+ g_node.appendChild(symbol_node)
+ for child_node in xml_map.getElementsByTagName('svg')[0].childNodes:
+ if child_node.nodeName == 'g' and _attr(child_node, 'id') == 'HighestOrderLayer':
+ child_node.appendChild(g_node)
+ return xml_map
+
+ # Returning
+ return xml_map