aboutsummaryrefslogtreecommitdiff
path: root/diplomacy/engine/game.py
diff options
context:
space:
mode:
Diffstat (limited to 'diplomacy/engine/game.py')
-rw-r--r--diplomacy/engine/game.py4289
1 files changed, 4289 insertions, 0 deletions
diff --git a/diplomacy/engine/game.py b/diplomacy/engine/game.py
new file mode 100644
index 0000000..73b2ff9
--- /dev/null
+++ b/diplomacy/engine/game.py
@@ -0,0 +1,4289 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+# -*- coding: utf-8 -*-
+""" Game
+ - Contains the game engine
+"""
+# pylint: disable=too-many-lines
+import os
+import sys
+import time
+import uuid
+import random
+from copy import deepcopy
+
+import numpy as np
+
+from diplomacy import settings
+import diplomacy.utils.errors as err
+from diplomacy.engine.map import Map
+from diplomacy.engine.message import Message, GLOBAL
+from diplomacy.engine.power import Power
+from diplomacy.engine.renderer import Renderer
+from diplomacy.utils import PriorityDict, common, exceptions, parsing, strings
+from diplomacy.utils.jsonable import Jsonable
+from diplomacy.utils.sorted_dict import SortedDict
+from diplomacy.utils.constants import OrderSettings
+from diplomacy.utils.game_phase_data import GamePhaseData, MESSAGES_TYPE
+
+# Constants
+UNDETERMINED, POWER, UNIT, LOCATION, COAST, ORDER, MOVE_SEP, OTHER = 0, 1, 2, 3, 4, 5, 6, 7
+
+class Game(Jsonable):
+ """
+ - combat - Dictionary of dictionaries containing the strength of every attack on a location (including units
+ who don't count toward dislodgment)
+ - Format: {loc: attack_strength: [ ['src loc', [support loc] ]}
+ e.g. { 'MUN': { 1 : [ ['A MUN', [] ], ['A RUH', [] ] ], 2 : [ ['A SIL', ['A BOH'] ] ] } }
+ MUN is holding, being attack without support from RUH and being attacked with support from SIL
+ (S from BOH)
+ - command - Contains the list of finalized orders to be processed (same format as orders, but without .order)
+ e.g. {'A PAR': '- A MAR'}
+ - controlled_powers: for client games only. List of powers currently controlled by associated client user.
+ - convoy_paths - Contains the list of remaining convoys path for each convoyed unit to reach their destination
+ Note: This is used to see if there are still active convoy paths remaining.
+ Note: This also include the start and ending location
+ e.g. {'A PAR': [ ['PAR', 'ION','NAO', 'MAR], ['PAR', 'ION', 'MAR'] ], ... }
+ - convoy_paths_possible - Contains the list of possible convoy paths given the current fleet locations or None
+ e.g. [(START_LOC, {Fleets Req}, {possible dest}), ...]
+ - convoy_paths_dest - Contains a dictionary of possible paths to reach destination from start or None
+ e.g. {start_loc: {dest_loc_1: [{fleets}, {fleets}, {fleets}], dest_loc_2: [{fleets, fleets}]}
+ - deadline: integer: game deadline in seconds.
+ - dislodged - Contains a dictionary of dislodged units (and the site that dislodged them')
+ e.g. { 'A PAR': 'MAR' }
+ - error - Contains a list of errors that the game generated
+ e.g. ['NO MASTER SPECIFIED']
+ - game_id: String that contains the current game's ID
+ e.g. '123456'
+ - lost - Contains a dictionary of centers that have been lost during the term
+ e.g. {'PAR': 'FRANCE'} to indicate that PAR was lost by France (previous owner)
+ - map: Contains a reference to the current map (Map instance)
+ e.g. map = Map('standard')
+ - map_name: Contains a reference to the name of the map that was loaded
+ e.g. map_name = 'standard'
+ - messages (only for non-observer games): history of messages exchanged inside this game.
+ Sorted dict mapping message timestamps to message objects (instances of diplomacy.Message).
+ Format: {message.time_sent => message}
+ - message_history: history of messages through all played phases.
+ Sorted dict mapping a short phase name to a message dict
+ (with same format as field `message` describe above).
+ Format: {short phase name => {message.time_sent => message}}
+ Wrapped in a sorted dict at runtime, see method __init__().
+ - meta_rules - Contains the rules that have been processed as directives
+ e.g. ['NO_PRESS']
+ - n_controls: integer: exact number of controlled powers allowed for this game.
+ If game start mode is not START_MASTER, then game starts as soon as this number of powers
+ are controlled.
+ - no_rules - Contains the list of rules that have been disabled (prefixed with '!')
+ e.g ['NO_PRESS']
+ - note - A note to display on the rendering
+ e.g. 'Winner: FRANCE'
+ - observer_level: for client games only. Highest observation level allowed for associated client user.
+ Either "master_type", "omniscient_type" or "observer_type".
+ - orders - Contains the list of current orders (not yet processed)
+ e.g. {'A PAR': '- A MAR'}
+ - ordered_units - Contains a dictionary of the units ordered by each power in the last phase
+ e.g. {'FRANCE': ['A PAR', 'A MAR'], 'ENGLAND': ... }
+ - order_history - Contains the history of orders from each player from the beginning of the game.
+ Sorted dict mapping a short phase name to a dictionary of orders
+ (powers names as keys, powers orders as values).
+ Format: {short phase name => {power name => [orders]}}
+ Wrapped in a sorted dict at runtime, see method __init__().
+ - outcome - Contains the game outcome
+ e.g. [lastPhase, victor1, victor2, victor3]
+ - phase: String that contains a long representation of the current phase
+ e.g. 'SPRING 1901 MOVEMENT'
+ - phase_type: Indicates the current phase type
+ (e.g. 'M' for Movement, 'R' for Retreats, 'A' for Adjustment, '-' for non-playing phase)
+ e.g. 'M'
+ - popped - Contains a list of all retreaters who didn't make it
+ e.g. ['A PAR', 'A MAR']
+ - powers - Contains a dictionary mapping power names to power instances in the game
+ e.g. {'FRANCE': FrancePower, 'ENGLAND': EnglishPower, ...}
+ - registration_password: ** hashed ** version of password to be sent by a player to join this game.
+ - renderer - Contains the object in charge of rendering the map
+ e.g. Renderer()
+ - result - Contains the result of the action for each unit.
+ In Movement Phase, result can be 'no convoy', 'bounce', 'void', 'cut', 'dislodged', 'disrupted'
+ e.g. { 'A PAR': ['cut', 'void'] }
+ In Retreats phase, result can be 'bounce', 'disband', 'void'
+ e.g. { 'A PAR': ['cut', 'void'] }
+ In Adjustments phase, result can be 'void' or ''
+ e.g. { 'A PAR': ['', 'void'] } # e.g. to indicate a successful build, and a void build.
+ - result_history - Contains the history of orders results for all played phases.
+ Sorted dict mapping a short phase name to a dictionary of order results for this phase.
+ Dictionary of order results maps a unit to a list of results. See field result for more details.
+ Format: {short phase name => {unit => [results]}}
+ Wrapped in a sorted dict at runtime, see method __init__().
+ - role: game type (observer, omniscient, player or server game).
+ Either a power name (for player game) or a value in diplomacy.utils.strings.ALL_ROLE_TYPES.
+ - rules: Contains a list of active rules
+ e.g. ['NO_PRESS', ...]
+ - state_history: history of previous game states (returned by method get_state()) for this game.
+ Sorted dict mapping a short phase name to a game state.
+ Each game state is associated to a timestamp generated when state is created by method get_state().
+ State timestamp then represents the "end" time of the state, ie. time when this state was saved and
+ archived in state history.
+ Format: {short phase name => state}
+ Wrapped in a sorted dict at runtime, see method __init__().
+ - status: game status (forming, active, paused, completed or canceled).
+ Possible values in diplomacy.utils.strings.ALL_GAME_STATUSES.
+ - supports - Contains a dictionary of support for each unit
+ - Format: { 'unit': [nb_of_support, [list of supporting units]] }
+ e.g. { 'A PAR': [2, ['A MAR']] }
+ 2 support, but the Marseille support does NOT count toward dislodgment
+ - timestamp_created: timestamp in microseconds when game object was created on server side.
+ - victory - Indicates the number of SUPPLY [default] centers one power must control to win the game
+ - Format: [reqFirstYear, reqSecondYear, ..., reqAllFurtherYears]
+ e.g. [10,10,18] for 10 the 1st year, 10 the 2nd year, 18 year 3+
+ - win - Indicates the minimum number of centers required to win
+ e.g. 3
+ - zobrist_hash - Contains the zobrist hash representing the current state of this game
+ e.g. 12545212418541325
+ """
+ # pylint: disable=too-many-instance-attributes
+ __slots__ = ['victory', 'no_rules', 'meta_rules', 'phase', 'note', 'map', 'powers', 'outcome', 'error', 'popped',
+ 'messages', 'order_history', 'orders', 'ordered_units', 'phase_type', 'win', 'combat', 'command',
+ 'result', 'supports', 'dislodged', 'lost', 'convoy_paths', 'convoy_paths_possible',
+ 'convoy_paths_dest', 'zobrist_hash', 'renderer', 'game_id', 'map_name', 'role', 'rules',
+ 'message_history', 'state_history', 'result_history', 'status', 'timestamp_created', 'n_controls',
+ 'deadline', 'registration_password', 'observer_level', 'controlled_powers', '_phase_wrapper_type',
+ 'phase_abbr']
+ zobrist_tables = {}
+ rule_cache = ()
+ model = {
+ strings.CONTROLLED_POWERS: parsing.OptionalValueType(parsing.SequenceType(str)),
+ strings.DEADLINE: parsing.DefaultValueType(int, 300),
+ strings.ERROR: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.GAME_ID: parsing.OptionalValueType(str),
+ strings.MAP_NAME: parsing.DefaultValueType(str, 'standard'),
+ strings.MESSAGE_HISTORY: parsing.DefaultValueType(parsing.DictType(str, MESSAGES_TYPE), {}),
+ strings.MESSAGES: parsing.DefaultValueType(MESSAGES_TYPE, []),
+ strings.META_RULES: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.N_CONTROLS: parsing.OptionalValueType(int),
+ strings.NO_RULES: parsing.DefaultValueType(parsing.SequenceType(str, set), []),
+ strings.NOTE: parsing.DefaultValueType(str, ''),
+ strings.OBSERVER_LEVEL: parsing.OptionalValueType(
+ parsing.EnumerationType((strings.MASTER_TYPE, strings.OMNISCIENT_TYPE, strings.OBSERVER_TYPE))),
+ strings.ORDER_HISTORY: parsing.DefaultValueType(
+ parsing.DictType(str, parsing.DictType(str, parsing.SequenceType(str))), {}),
+ strings.OUTCOME: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.PHASE: parsing.DefaultValueType(str, ''),
+ strings.PHASE_ABBR: parsing.DefaultValueType(str, ''),
+ strings.POWERS: parsing.DefaultValueType(parsing.DictType(str, parsing.JsonableClassType(Power)), {}),
+ strings.REGISTRATION_PASSWORD: parsing.OptionalValueType(str),
+ strings.RESULT_HISTORY: parsing.DefaultValueType(parsing.DictType(str, parsing.DictType(
+ str, parsing.SequenceType(parsing.EnumerationType(
+ ['no convoy', 'bounce', 'void', 'cut', 'dislodged', 'disrupted', 'disband', ''])))), {}),
+ strings.ROLE: parsing.DefaultValueType(str, strings.SERVER_TYPE),
+ strings.RULES: parsing.DefaultValueType(parsing.SequenceType(str, sequence_builder=list), ()),
+ strings.STATE_HISTORY: parsing.DefaultValueType(parsing.DictType(str, dict), {}),
+ strings.STATUS: parsing.DefaultValueType(parsing.EnumerationType(strings.ALL_GAME_STATUSES), strings.FORMING),
+ strings.TIMESTAMP_CREATED: parsing.OptionalValueType(int),
+ strings.VICTORY: parsing.DefaultValueType(parsing.SequenceType(int), []),
+ strings.WIN: parsing.DefaultValueType(int, 0),
+ strings.ZOBRIST_HASH: parsing.DefaultValueType(parsing.StringableType(np.int64), '0'),
+ }
+
+ def __init__(self, game_id=None, **kwargs):
+ """ Constructor """
+ self.victory = None
+ self.no_rules = set()
+ self.meta_rules = []
+ self.phase, self.note = '', ''
+ self.map = None # type: Map
+ self.powers = {}
+ self.outcome, self.error, self.popped = [], [], []
+ self.orders, self.ordered_units = {}, {}
+ self.phase_type = None
+ self.win = None
+ self.combat, self.command, self.result = {}, {}, {}
+ self.supports, self.dislodged, self.lost = {}, {}, {}
+ self.convoy_paths, self.convoy_paths_possible, self.convoy_paths_dest = {}, None, None
+ self.zobrist_hash = 0
+ self.renderer = None
+ self.game_id = None # type: str
+ self.map_name = None # type: str
+ self.messages = None # type: SortedDict
+ self.role = None # type: str
+ self.rules = []
+ self.state_history, self.order_history, self.result_history, self.message_history = {}, {}, {}, {}
+ self.status = None # type: str
+ self.timestamp_created = None # type: int
+ self.n_controls = None
+ self.deadline = 0
+ self.registration_password = None
+ self.observer_level = None
+ self.controlled_powers = None
+
+ # Remove rules from kwargs (if present), as we want to add them manually using self.add_rule().
+ rules = kwargs.pop(strings.RULES, None)
+
+ # Update rules with game ID.
+ kwargs[strings.GAME_ID] = game_id
+
+ # Initialize game with kwargs.
+ super(Game, self).__init__(**kwargs)
+
+ # Check settings.
+ if self.registration_password is not None and self.registration_password == '':
+ raise exceptions.DiplomacyException('Registration password must be None or non-empty string.')
+ if self.n_controls is not None and self.n_controls < 0:
+ raise exceptions.NaturalIntegerException('n_controls must be a natural integer.')
+ if self.deadline < 0:
+ raise exceptions.NaturalIntegerException('Deadline must be a natural integer.')
+ # Check rules.
+ if rules is None:
+ rules = ['SOLITAIRE', 'NO_PRESS', 'IGNORE_ERRORS', 'POWER_CHOICE']
+ # Set game rules.
+ for rule in rules:
+ self.add_rule(rule)
+ # Check settings about rule NO_DEADLINE.
+ if 'NO_DEADLINE' in self.rules:
+ self.deadline = 0
+ # Check settings about rule SOLITAIRE.
+ if 'SOLITAIRE' in self.rules:
+ self.n_controls = 0
+ elif self.n_controls == 0:
+ # If number of allowed players is 0, the game can only be solitaire.
+ self.add_rule('SOLITAIRE')
+
+ # Check timestamp_created.
+ if self.timestamp_created is None:
+ self.timestamp_created = common.timestamp_microseconds()
+
+ # Check game ID.
+ if self.game_id is None:
+ self.game_id = '%s/%s' % (self.timestamp_created, uuid.uuid4())
+
+ # Validating status
+ self._validate_status(reinit_powers=(self.timestamp_created is None))
+
+ if self.powers:
+ # Game loaded with powers.
+ # Associate loaded powers with this game.
+ for power in self.powers.values():
+ power.game = self
+ else:
+ # Begin game.
+ self._begin()
+
+ # Game loaded.
+
+ # Check map powers.
+ assert all(self.has_power(power_name) for power_name in self.map.powers)
+
+ # Check role and consistency between all power roles and game role.
+ if self.has_power(self.role):
+ # It's a power game. Each power must be a player power.
+ assert all(power.role == power.name for power in self.powers.values())
+ else:
+ # We should have a non-power game and each power must have same role as game role.
+ assert self.role in strings.ALL_ROLE_TYPES
+ assert all(power.role == self.role for power in self.powers.values())
+
+ # Wrap history fields into runtime sorted dictionaries.
+ # This is necessary to sort history fields by phase name.
+
+ self._phase_wrapper_type = common.str_cmp_class(self.map.compare_phases)
+
+ self.order_history = SortedDict(self._phase_wrapper_type, dict,
+ {self._phase_wrapper_type(key): value
+ for key, value in self.order_history.items()})
+ self.message_history = SortedDict(self._phase_wrapper_type, SortedDict,
+ {self._phase_wrapper_type(key): value
+ for key, value in self.message_history.items()})
+ self.state_history = SortedDict(self._phase_wrapper_type, dict,
+ {self._phase_wrapper_type(key): value
+ for key, value in self.state_history.items()})
+ self.result_history = SortedDict(self._phase_wrapper_type, dict,
+ {self._phase_wrapper_type(key): value
+ for key, value in self.result_history.items()})
+
+ def __str__(self):
+ """ Returns a string representation of the game instance """
+ show_map = self.map
+ show_result = self.outcome
+
+ text = ''
+ text += 'GAME %s%s%s' % (self.game_id, '\nPHASE ', self.phase)
+ text += '\nMAP %s' % self.map_name if show_map else ''
+ text += '\nRESULT %s' % ' '.join(self.outcome) if show_result else ''
+ text += '\nRULE '.join([''] + [rule for rule in self.rules if rule not in self.meta_rules])
+ text += '\nRULE !'.join([''] + [no_rule for no_rule in self.no_rules])
+ return text
+
+ def __deepcopy__(self, memo):
+ """ Fast deep copy implementation """
+ cls = self.__class__
+ result = cls.__new__(cls)
+
+ # Deep copying
+ for key in self._slots:
+ if key in ['map', 'renderer', 'powers']:
+ continue
+ setattr(result, key, deepcopy(getattr(self, key)))
+ setattr(result, 'map', self.map)
+ setattr(result, 'powers', {})
+ for power in self.powers.values():
+ result.powers[power.name] = deepcopy(power)
+ setattr(result.powers[power.name], 'game', result)
+ return result
+
+ # ====================================================================
+ # Public Interface
+ # ====================================================================
+
+ @property
+ def _slots(self):
+ """ Return an iterable of all attributes of this object.
+ Should be used in place of "self.__slots__" to be sure to retrieve all
+ attribute names from a derived class (including parent slots).
+ """
+ return (name for cls in type(self).__mro__ for name in getattr(cls, '__slots__', ()))
+
+ @property
+ def power(self):
+ """ (only for player games) Return client power associated to this game.
+ :return: a Power object.
+ :rtype: Power
+ """
+ return self.powers[self.role] if self.is_player_game() else None
+
+ @property
+ def is_game_done(self):
+ """ Returns a boolean flag that indicates if the game is done """
+ return self.phase == 'COMPLETED'
+
+ is_game_forming = property(lambda self: self.status == strings.FORMING)
+ is_game_active = property(lambda self: self.status == strings.ACTIVE)
+ is_game_paused = property(lambda self: self.status == strings.PAUSED)
+ is_game_canceled = property(lambda self: self.status == strings.CANCELED)
+ is_game_completed = property(lambda self: self.status == strings.COMPLETED)
+ current_short_phase = property(lambda self: self.map.phase_abbr(self.phase, self.phase))
+
+ civil_disorder = property(lambda self: 'CIVIL_DISORDER' in self.rules)
+ multiple_powers_per_player = property(lambda self: 'MULTIPLE_POWERS_PER_PLAYER' in self.rules)
+ no_observations = property(lambda self: 'NO_OBSERVATIONS' in self.rules)
+ no_press = property(lambda self: 'NO_PRESS' in self.rules)
+ power_choice = property(lambda self: 'POWER_CHOICE' in self.rules)
+ public_press = property(lambda self: 'PUBLIC_PRESS' in self.rules)
+ real_time = property(lambda self: 'REAL_TIME' in self.rules)
+ start_master = property(lambda self: 'START_MASTER' in self.rules)
+ solitaire = property(lambda self: 'SOLITAIRE' in self.rules)
+
+ # ==============================================================
+ # Application/network methods (mainly used for connected games).
+ # ==============================================================
+
+ def is_player_game(self):
+ """ Return True if this game is a player game. """
+ return self.has_power(self.role)
+
+ def is_observer_game(self):
+ """ Return True if this game is an observer game. """
+ return self.role == strings.OBSERVER_TYPE
+
+ def is_omniscient_game(self):
+ """ Return True if this game is an omniscient game. """
+ return self.role == strings.OMNISCIENT_TYPE
+
+ def is_server_game(self):
+ """ Return True if this game is a server game. """
+ return self.role == strings.SERVER_TYPE
+
+ def is_valid_password(self, registration_password):
+ """ Return True if given plain password matches registration password. """
+ if self.registration_password is None:
+ return registration_password is None
+ if registration_password is None:
+ return False
+ return common.is_valid_password(registration_password, self.registration_password)
+
+ def is_controlled(self, power_name):
+ """ Return True if given power name is currently controlled. """
+ return self.get_power(power_name).is_controlled()
+
+ def is_dummy(self, power_name):
+ """ Return True if given power name is not currently controlled. """
+ return not self.is_controlled(power_name)
+
+ def does_not_wait(self):
+ """ Return True if the game does not wait anything to process its current phase.
+ The game is not waiting is all **controlled** powers have defined orders and wait flag set to False.
+ If it's a solitaire game (with no controlled powers), all (dummy, not eliminated) powers must have defined
+ orders and wait flag set to False. By default, wait flag for a dummy power is True.
+ Note that an empty orders set is considered as a defined order as long as it was
+ explicitly set by the power controller.
+ """
+ if any(power.is_controlled() for power in self.powers.values()):
+ return all(power.does_not_wait() for power in self.powers.values() if power.is_controlled())
+ return all(power.is_eliminated() or power.does_not_wait() for power in self.powers.values())
+
+ def has_power(self, power_name):
+ """ Return True if this game has given power name. """
+ return power_name in self.map.powers
+
+ def has_expected_controls_count(self):
+ """ Return True if game has expected number of map powers to be controlled.
+ If True, the game can start (if not yet started).
+ """
+ return self.count_controlled_powers() == self.get_expected_controls_count()
+
+ def count_controlled_powers(self):
+ """ Return the number of controlled map powers. """
+ return sum(1 for power_name in self.get_map_power_names() if self.is_controlled(power_name))
+
+ def get_controlled_power_names(self, username):
+ """ Return the list of power names currently controlled by given user name. """
+ return [power.name for power in self.powers.values() if power.is_controlled_by(username)]
+
+ def get_expected_controls_count(self):
+ """ Return the number of map powers expected to be controlled in this game.
+ This number is either specified in settings or the number of map powers.
+ """
+ expected_count = self.n_controls
+ if expected_count is None:
+ expected_count = len(self.powers)
+ return expected_count
+
+ def get_map_power_names(self):
+ """ Return sequence of map power names. """
+ return self.powers.keys()
+
+ def get_dummy_power_names(self):
+ """ Return sequence of dummy power objects. """
+ return set(power_name for power_name in self.get_map_power_names() if self.is_dummy(power_name))
+
+ def get_controllers(self):
+ """ Return a dictionary mapping each power name to its current controller name."""
+ return {power.name: power.get_controller() for power in self.powers.values()}
+
+ def get_controllers_timestamps(self):
+ """ Return a dictionary mapping each power name to its controller timestamp. """
+ return {power.name: power.get_controller_timestamp() for power in self.powers.values()}
+
+ def get_random_power_name(self):
+ """ Return a random power name from remaining dummy power names.
+ Raise an exception if there are no dummy power names.
+ """
+ playable_power_names = list(self.get_dummy_power_names())
+ if not playable_power_names:
+ raise exceptions.RandomPowerException(1, len(playable_power_names))
+ playable_power_names.sort()
+ return playable_power_names[random.randint(0, len(playable_power_names) - 1)]
+
+ def get_latest_timestamp(self):
+ """ Return timestamp of latest data saved into this game (either current state, archived state or message).
+ :return: a timestamp
+ :rtype: int
+ """
+ timestamp = self.timestamp_created
+ if self.state_history:
+ timestamp = max(self.state_history.last_value()['timestamp'], timestamp)
+ if self.messages:
+ timestamp = max(self.messages.last_key(), timestamp)
+ return timestamp
+
+ @classmethod
+ def filter_messages(cls, messages, game_role, timestamp_from=None, timestamp_to=None):
+ """ Filter given messages based on given game role between given timestamps (bounds included).
+ See method diplomacy.utils.SortedDict.sub() about bound rules.
+ :param messages: a sorted dictionary of messages to filter.
+ :param game_role: game role requiring messages. Either a special power name
+ (PowerName.OBSERVER or PowerName.OMNISCIENT), a power name, or a list of power names.
+ :param timestamp_from: lower timestamp (included) for required messages.
+ :param timestamp_to: upper timestamp (included) for required messages.
+ :return: a dict of corresponding messages (empty if no corresponding messages found),
+ mapping messages timestamps to messages.
+ :type messages: diplomacy.utils.sorted_dict.SortedDict
+ """
+
+ # Observer can see global messages and system messages sent to observers.
+ if isinstance(game_role, str) and game_role == strings.OBSERVER_TYPE:
+ return {message.time_sent: message
+ for message in messages.sub(timestamp_from, timestamp_to)
+ if message.is_global() or message.for_observer()}
+
+ # Omniscient observer can see all messages.
+ if isinstance(game_role, str) and game_role == strings.OMNISCIENT_TYPE:
+ return {message.time_sent: message
+ for message in messages.sub(timestamp_from, timestamp_to)}
+
+ # Power can see global messages and all messages she sent or received.
+ if isinstance(game_role, str):
+ game_role = [game_role]
+ elif not isinstance(game_role, list):
+ game_role = list(game_role)
+ return {message.time_sent: message
+ for message in messages.sub(timestamp_from, timestamp_to)
+ if message.is_global() or message.recipient in game_role or message.sender in game_role}
+
+ def get_phase_history(self, from_phase=None, to_phase=None, game_role=None):
+ """ Return a list of game phase data from game history between given phases (bounds included).
+ Each GamePhaseData object contains game state, messages, orders and order results for a phase.
+ :param from_phase: either:
+ - a string: phase name
+ - an integer: index of phase in game history
+ - None (default): lowest phase stored in game history
+ :param to_phase: either:
+ - a string: phase name
+ - an integer: index of phase in game history
+ - None (default): latest phase stored in game history
+ :param game_role (optional): role of game for which phase history is retrieved.
+ If none, messages in game history will not be filtered.
+ """
+ if isinstance(from_phase, int):
+ from_phase = self.state_history.key_from_index(from_phase)
+ elif isinstance(from_phase, str):
+ from_phase = self._phase_wrapper_type(from_phase)
+ if isinstance(to_phase, int):
+ to_phase = self.state_history.key_from_index(to_phase)
+ elif isinstance(to_phase, str):
+ to_phase = self._phase_wrapper_type(to_phase)
+ phases = self.state_history.sub_keys(from_phase, to_phase)
+ states = self.state_history.sub(from_phase, to_phase)
+ orders = self.order_history.sub(from_phase, to_phase)
+ messages = self.message_history.sub(from_phase, to_phase)
+ results = self.result_history.sub(from_phase, to_phase)
+ if game_role:
+ messages = [self.filter_messages(msg_dict, game_role) for msg_dict in messages]
+ assert len(phases) == len(states) == len(orders) == len(messages) == len(results), (
+ len(phases), len(states), len(orders), len(messages), len(results))
+ return [GamePhaseData(name=str(phases[i]),
+ state=states[i],
+ orders=orders[i],
+ messages=messages[i],
+ results=results[i])
+ for i in range(len(phases))]
+
+ def get_phase_from_history(self, short_phase_name, game_role=None):
+ """ Return a game phase data corresponding to given phase from phase history. """
+ return self.get_phase_history(short_phase_name, short_phase_name, game_role)[0]
+
+ def phase_history_from_timestamp(self, timestamp):
+ """ Return list of game phase data from game history for which state timestamp >= given timestamp. """
+ earliest_phase = ''
+ for state in self.state_history.reversed_values():
+ if state['timestamp'] < timestamp:
+ break
+ earliest_phase = state['name']
+ return self.get_phase_history(from_phase=earliest_phase) if earliest_phase else []
+
+ def extend_phase_history(self, game_phase_data):
+ """ Add data from a game phase to game history.
+ :param game_phase_data: a GamePhaseData object.
+ :type game_phase_data: GamePhaseData
+ """
+ phase = self._phase_wrapper_type(game_phase_data.name)
+ assert phase not in self.state_history
+ assert phase not in self.message_history
+ assert phase not in self.order_history
+ assert phase not in self.result_history
+ self.state_history.put(phase, game_phase_data.state)
+ self.message_history.put(phase, game_phase_data.messages)
+ self.order_history.put(phase, game_phase_data.orders)
+ self.result_history.put(phase, game_phase_data.results)
+
+ def set_status(self, status):
+ """ Set game status with given status (should be in diplomacy.utils.strings.ALL_GAME_STATUSES). """
+ assert status in strings.ALL_GAME_STATUSES
+ self.status = status
+
+ def draw(self, winners=None):
+ """ Force a draw for this game, set status as COMPLETED and finish the game.
+ :param winners: (optional) either None (all powers remaining to map are considered winners) or a sequence
+ of required power names to be considered as winners.
+ :return: a couple (previous state, current state) with game state before the draw and game state after
+ the draw.
+ """
+ if winners is None:
+ # Draw with all powers which still have units in map.
+ winners = [power.name for power in self.powers.values() if power.units]
+
+ # No orders will be processed when drawing, so clear current orders.
+ self.clear_orders()
+
+ # Collect data about current phase before drawing.
+ previous_phase = self._phase_wrapper_type(self.current_short_phase)
+ previous_orders = self.get_orders()
+ previous_messages = self.messages.copy()
+ previous_state = self.get_state()
+
+ # Finish the game.
+ self._finish(winners)
+
+ # Then clear game and save previous phase.
+ self.clear_vote()
+ self.clear_orders()
+ self.messages.clear()
+ self.order_history.put(previous_phase, previous_orders)
+ self.message_history.put(previous_phase, previous_messages)
+ self.state_history.put(previous_phase, previous_state)
+ self.result_history.put(previous_phase, {})
+
+ # There are no expected results for orders, as there are no orders processed.
+
+ previous_phase_data = GamePhaseData(name=str(previous_phase),
+ state=previous_state,
+ orders=previous_orders,
+ messages=previous_messages,
+ results={})
+ current_phase_data = GamePhaseData(name=self.current_short_phase,
+ state=self.get_state(),
+ orders={},
+ messages={},
+ results={})
+
+ return previous_phase_data, current_phase_data
+
+ def set_controlled(self, power_name, username):
+ """ Control power with given username (may be None to set dummy power).
+ See method diplomacy.Power#set_controlled.
+ """
+ self.get_power(power_name).set_controlled(username)
+
+ def update_dummy_powers(self, dummy_power_names):
+ """ Force all power associated to given dummy power names to be uncontrolled.
+ :param dummy_power_names: Sequence of required dummy power names.
+ """
+ for dummy_power_name in dummy_power_names:
+ if self.has_power(dummy_power_name):
+ self.set_controlled(dummy_power_name, None)
+
+ def update_powers_controllers(self, powers_controllers, timestamps):
+ """ Update powers controllers.
+ :param powers_controllers: a dictionary mapping a power name to a controller name.
+ :param timestamps: a dictionary mapping a power name to timestamp when related controller
+ (in powers_controllers) was associated to power.
+ :type powers_controllers: dict
+ """
+ for power_name, controller in powers_controllers.items():
+ self.get_power(power_name).update_controller(controller, timestamps[power_name])
+
+ def new_power_message(self, recipient, body):
+ """ Create a undated (without timestamp) power message to be sent from a power to another via server.
+ Server will answer with timestamp, and message will be updated
+ and added to local game messages.
+ :param recipient: recipient power name (string).
+ :param body: message body (string).
+ :return: a new GameMessage object.
+ :rtype: GameMessage
+ """
+ assert self.is_player_game()
+ if not self.has_power(recipient):
+ raise exceptions.MapPowerException(recipient)
+ return Message(phase=self.current_short_phase, sender=self.role, recipient=recipient, message=body)
+
+ def new_global_message(self, body):
+ """ Create an undated (without timestamp) global message to be sent from a power via server.
+ Server will answer with timestamp, and message will be updated and added to local game messages.
+ :param body: message body (string).
+ :return: a new GameMessage object.
+ :rtype: Message
+ """
+ assert self.is_player_game()
+ return Message(phase=self.current_short_phase, sender=self.role, recipient=GLOBAL, message=body)
+
+ def add_message(self, message):
+ """ Add message to current game data.
+ Only a server game can add a message with no timestamp:
+ game will auto-generate a timestamp for the message.
+ :param message: a GameMessage object to add.
+ :return: message timestamp.
+ :rtype: int
+ """
+ assert isinstance(message, Message)
+ if self.is_player_game():
+ assert message.is_global() or self.power.name in (message.sender, message.recipient)
+
+ if message.time_sent is None:
+ # This instance must be a server game.
+ # Message should be a new message matching current game phase.
+ # There should not be any more recent message in message history (as we are adding a new message).
+ # We must generate a timestamp for this message.
+ assert self.is_server_game()
+ if message.phase != self.current_short_phase:
+ raise exceptions.GamePhaseException(self.current_short_phase, message.phase)
+ assert not self.messages or common.timestamp_microseconds() >= self.messages.last_key()
+ time.sleep(1e-6)
+ message.time_sent = common.timestamp_microseconds()
+
+ self.messages.put(message.time_sent, message)
+ return message.time_sent
+
+ # Vote methods. For server and omniscient games only.
+ # Observer game should not see votes.
+ # Power game should know only vote of related power (votes for all other power should be 'neutral' in a power game).
+
+ def has_draw_vote(self):
+ """ Return True if all controlled non-eliminated powers have voted YES to draw game at current phase. """
+ assert self.is_server_game() or self.is_omniscient_game()
+ return all(
+ power.vote == strings.YES
+ for power in self.powers.values()
+ if not power.is_eliminated()
+ )
+
+ def count_voted(self):
+ """ Return the count of controlled powers who already voted for a draw for current phase. """
+ assert self.is_server_game() or self.is_omniscient_game()
+ return sum(1 for power in self.powers.values()
+ if not power.is_eliminated() and power.vote != strings.NEUTRAL)
+
+ def clear_vote(self):
+ """ Clear current vote. """
+ for power in self.powers.values():
+ power.vote = strings.NEUTRAL
+
+ # ==============
+ # Basic methods.
+ # ==============
+ def get_units(self, power_name=None):
+ """ Retrieves the list of units for a power or for all powers
+ :param power_name: Optional. The name of the power (e.g. 'FRANCE') or None for all powers
+ :return: A list of units (e.g. ['A PAR', 'A MAR']) if a power name is provided
+ or a dictionary of powers with their units if None is provided (e.g. {'FRANCE': [...], ...}
+
+ Note: Dislodged units will appear with a leading asterisk (e.g. '*A PAR')
+ """
+ if power_name is not None:
+ power_name = power_name.upper()
+ power = self.get_power(power_name)
+ if power_name is not None:
+ return power.units[:] + ['*{}'.format(unit) for unit in power.retreats]
+ if power_name is None:
+ units = {}
+ for power in self.powers.values():
+ units[power.name] = self.get_units(power.name)
+ return units
+ return []
+
+ def get_centers(self, power_name=None):
+ """ Retrieves the list of owned supply centers for a power or for all powers
+ :param power_name: Optional. The name of the power (e.g. 'FRANCE') or None for all powers
+ :return: A list of supply centers (e.g. ['PAR', 'MAR']) if a power name is provided
+ or a dictionary of powers with their supply centers if None is provided
+ (e.g. {'FRANCE': [...], ...}
+ """
+ if power_name is not None:
+ power_name = power_name.upper()
+ power = self.get_power(power_name)
+ if power_name is not None:
+ return power.centers[:]
+ if power_name is None:
+ centers = {}
+ for power in self.powers.values():
+ centers[power.name] = self.get_centers(power.name)
+ return centers
+ return []
+
+ def get_orders(self, power_name=None):
+ """ Retrieves the orders submitted by a specific power, or by all powers
+ :param power_name: Optional. The name of the power (e.g. 'FRANCE') or None for all powers
+ :return: A list of orders (e.g. ['A PAR H', 'A MAR - BUR']) if a power name is provided
+ or a dictionary of powers with their orders if None is provided
+ (e.g. {'FRANCE': ['A PAR H', 'A MAR - BUR', ...], ...}
+ """
+ if power_name is not None:
+ power_name = power_name.upper()
+ power = self.get_power(power_name)
+
+ # Getting orders for a particular power
+ # Skipping VOID and WAIVE orders in Adjustment/Retreats phase
+ if power_name is not None:
+ if self.get_current_phase()[-1] == 'M':
+ if 'NO_CHECK' in self.rules:
+ power_orders = [power.orders[order] for order in power.orders if power.orders[order]]
+ else:
+ power_orders = ['{} {}'.format(unit, unit_order) for unit, unit_order in power.orders.items()]
+ else:
+ power_orders = [order for order in power.adjust
+ if order and order != 'WAIVE' and order.split()[0] != 'VOID']
+ return power_orders
+
+ # Recursively calling itself to get all powers
+ elif power_name is None:
+ orders = {}
+ for power in self.powers.values():
+ orders[power.name] = self.get_orders(power.name)
+ return orders
+ return []
+
+ def get_orderable_locations(self, power_name=None):
+ """ Find the location requiring an order for a power (or for all powers)
+ :param power_name: Optionally, the name of the power (e.g. 'FRANCE') or None for all powers
+ :return: A list of orderable locations (e.g. ['PAR', 'MAR']) if a power name is provided
+ or a dictionary of powers with their orderable locations if None is not provided
+ (e.g. {'FRANCE': [...], ...}
+ """
+ if power_name is not None:
+ power_name = power_name.upper()
+ power = self.get_power(power_name)
+
+ # Single power
+ if power_name is not None:
+ current_phase_type = self.get_current_phase()[-1]
+
+ # Adjustment
+ if current_phase_type == 'A':
+ build_count = len(power.centers) - len(power.units)
+
+ # Building - All unoccupied homes
+ if build_count > 0:
+ orderable_locs = self._build_sites(power)
+
+ # Nothing can be built.
+ elif build_count == 0:
+ orderable_locs = []
+
+ # Disbanding - All units location
+ else:
+ orderable_locs = [unit[2:5] for unit in power.units]
+
+ # Retreating
+ elif current_phase_type == 'R':
+ orderable_locs = [unit[2:5] for unit in power.retreats]
+
+ # Movement
+ else:
+ orderable_locs = [unit[2:5] for unit in power.units]
+
+ # Returning and sorting for deterministic output
+ return sorted(orderable_locs)
+
+ # All powers
+ else:
+ return {power.name: self.get_orderable_locations(power.name) for power in self.powers.values()}
+
+ def get_order_status(self, power_name=None, unit=None):
+ """ Returns a list or a dict representing the order status ('', 'no convoy', 'bounce', 'void', 'cut',
+ 'dislodged', 'disrupted') for orders submitted in the last phase
+ :param power_name: Optional. If provided (e.g. 'FRANCE') will only return the order status of that
+ power's orders
+ :param unit: Optional. If provided (e.g. 'A PAR') will only return that specific unit order status.
+ :param phase_type: Optional. Returns the results of a specific phase type (e.g. 'M', 'R', or 'A')
+ :return: If unit is provided a list (e.g. [] or ['void', 'dislodged'])
+ If power is provided a dict (e.g. {'A PAR': ['void'], 'A MAR': []})
+ Otherwise a 2-level dict (e.g. {'FRANCE: {'A PAR': ['void'], 'A MAR': []}, 'ENGLAND': {}, ... }
+ """
+ # Specific location, returning string
+ if unit is not None:
+ result_dict = self.result_history.last_value() if self.result_history else {}
+ return result_dict[unit][:] if unit in result_dict else []
+
+ # Specific power, returning dictionary
+ if power_name is not None:
+ power_name = power_name.upper()
+ if power_name is not None:
+ order_status = {}
+ if self.state_history:
+ state_history = self.state_history.last_value()
+ for ordered_unit in state_history['units'][power_name]:
+ ordered_unit = ordered_unit.replace('*', '')
+ order_status[ordered_unit] = self.get_order_status(power_name, ordered_unit)
+ return order_status
+
+ # All powers
+ if power_name is None:
+ order_status = {}
+ for power in self.powers.values():
+ order_status[power.name] = self.get_order_status(power.name)
+ return order_status
+ return {}
+
+ def get_power(self, power_name):
+ """ Retrieves a power instance from given power name.
+ :param power_name: name of power instance to retrieve. Power name must be as given
+ in map file.
+ :return: the power instance, or None if power name is not found.
+ :rtype: Power
+ """
+ return self.powers.get(power_name, None)
+
+ def set_units(self, power_name, units, reset=False):
+ """ Sets units directly on the map
+ :param power_name: The name of the power who will own the units (e.g. 'FRANCE')
+ :param units: An unit (e.g. 'A PAR') or a list of units (e.g. ['A PAR', 'A MAR']) to set
+ Note units starting with a '*' will be set as dislodged
+ :param reset: Boolean. If, clear all units of the power before setting them
+ :return: Nothing
+ """
+ power_name = power_name.upper()
+ if not isinstance(units, list):
+ units = [units]
+ if power_name not in self.powers:
+ return
+
+ # Clearing old units if reset is true
+ if reset and power_name in self.powers:
+ self.powers[power_name].clear_units()
+
+ regular_units = [unit for unit in units if unit[0] != '*']
+ dislodged_units = [unit[1:] for unit in units if unit[0] == '*']
+ influence = [unit[2:5] for unit in regular_units + dislodged_units]
+
+ # Removing units that are already there
+ for power in self.powers.values():
+ for unit in regular_units:
+ unit_loc = unit[2:5]
+ for unit_to_remove in [p_unit for p_unit in power.units if p_unit[2:5] == unit_loc]:
+ self.update_hash(power.name, unit_type=unit_to_remove[0], loc=unit_to_remove[2:])
+ power.units.remove(unit_to_remove)
+ for unit in dislodged_units:
+ unit_loc = unit[2:5]
+ for unit_to_remove in [p_unit for p_unit in power.retreats if p_unit[2:5] == unit_loc]:
+ self.update_hash(power.name, unit_type=unit_to_remove[0], loc=unit_to_remove[2:], is_dislodged=True)
+ del power.retreats[unit_to_remove]
+ for loc in influence:
+ if loc in power.influence:
+ power.influence.remove(loc)
+
+ # Retrieving the target power
+ power = self.get_power(power_name)
+
+ # Re-adding normal units to the new power
+ for unit in regular_units:
+ word = unit.upper().split()
+ if len(word) != 2:
+ continue
+ unit_type, unit_loc = word
+ if unit_type in ('A', 'F') \
+ and unit_loc in [loc.upper() for loc in self.map.locs] \
+ and self.map.is_valid_unit(unit):
+ if power:
+ self.update_hash(power_name, unit_type=unit_type, loc=unit_loc)
+ power.units.append(unit)
+ power.influence.append(unit[2:5])
+ else:
+ self.error += [err.MAP_INVALID_UNIT % unit]
+
+ # Re-adding dislodged units to the new power
+ for unit in dislodged_units:
+ word = unit.upper().split()
+ if len(word) != 2:
+ continue
+ unit_type, unit_loc = word
+ if unit_type in ('A', 'F') and unit_loc in [loc.upper() for loc in self.map.locs]:
+ abuts = [abut.upper() for abut in self.map.abut_list(unit_loc, incl_no_coast=True)
+ if self._abuts(unit_type, unit_loc, '-', abut.upper())]
+ if power:
+ self.update_hash(power_name, unit_type=unit_type, loc=unit_loc, is_dislodged=True)
+ power.retreats[unit] = abuts
+
+ # Clearing cache
+ self.clear_cache()
+
+ def set_centers(self, power_name, centers, reset=False):
+ """ Transfers supply centers ownership
+ :param power_name: The name of the power who will control the supply centers (e.g. 'FRANCE')
+ :param centers: A loc (e.g. 'PAR') or a list of locations (e.g. ['PAR', 'MAR']) to transfer
+ :param reset: Boolean. If, removes ownership of all power's SC before transferring ownership of the new SC
+ :return: Nothing
+ """
+ power_name = power_name.upper()
+ if not isinstance(centers, list):
+ centers = [centers]
+ if power_name not in self.powers:
+ return
+
+ # Clearing old centers if reset is true
+ if reset and power_name in self.powers:
+ self.powers[power_name].clear_centers()
+
+ # Removing centers that are already controlled by another power
+ for power in self.powers.values():
+ for center in centers:
+ if center in power.centers:
+ self.update_hash(power.name, loc=center, is_center=True)
+ power.centers.remove(center)
+
+ # Transferring center to power_name
+ power = self.get_power(power_name)
+ if power:
+ for center in centers:
+ if center in self.map.scs:
+ self.update_hash(power_name, loc=center, is_center=True)
+ power.centers += [center]
+
+ # Clearing cache
+ self.clear_cache()
+
+ def set_orders(self, power_name, orders, expand=True, replace=True):
+ """ Sets the current orders for a power
+ :param power_name: The name of the power (e.g. 'FRANCE')
+ :param orders: The list of orders (e.g. ['A MAR - PAR', 'A PAR - BER', ...])
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ power_name = power_name.upper()
+
+ if not self.has_power(power_name):
+ return
+
+ if self.is_player_game() and self.role != power_name:
+ return
+
+ power = self.get_power(power_name)
+ if power:
+ if not isinstance(orders, list):
+ orders = [orders]
+
+ # Remove any empty string from orders.
+ orders = [order for order in orders if order]
+
+ # Setting orders depending on phase type
+ if self.phase_type == 'R':
+ self._update_retreat_orders(power, orders, expand=expand, replace=replace)
+ elif self.phase_type == 'A':
+ self._update_adjust_orders(power, orders, expand=expand, replace=replace)
+ else:
+ self._update_orders(power, orders, expand=expand, replace=replace)
+ power.order_is_set = (OrderSettings.ORDER_SET
+ if self.get_orders(power.name)
+ else OrderSettings.ORDER_SET_EMPTY)
+
+ def set_wait(self, power_name, wait):
+ """ Set wait flag for a power.
+ :param power_name: name of power to set wait flag.
+ :param wait: wait flag (boolean).
+ """
+ power_name = power_name.upper()
+
+ if not self.has_power(power_name):
+ return
+
+ power = self.get_power(power_name.upper()) # type: Power
+ power.wait = wait
+
+ def clear_units(self, power_name=None):
+ """ Clear the power's units
+ :param power_name: Optional. The name of the power whose units will be cleared (e.g. 'FRANCE'),
+ otherwise all units on the map will be cleared
+ :return: Nothing
+ """
+ for power in self.powers.values():
+ if power_name is None or power.name == power_name:
+ power.clear_units()
+ self.clear_cache()
+
+ def clear_centers(self, power_name=None):
+ """ Removes ownership of supply centers
+ :param power_name: Optional. The name of the power whose centers will be cleared (e.g. 'FRANCE'),
+ otherwise all centers on the map will lose ownership.
+ :return: Nothing
+ """
+ for power in self.powers.values():
+ if power_name is None or power.name == power_name:
+ power.clear_centers()
+ self.clear_cache()
+
+ def clear_orders(self, power_name=None):
+ """ Clears the power's orders
+ :param power_name: Optional. The name of the power to clear (e.g. 'FRANCE') or will clear orders for
+ all powers if None.
+ :return: Nothing
+ """
+ if power_name is not None:
+ power = self.get_power(power_name.upper())
+ power.clear_orders()
+ else:
+ for power in self.powers.values():
+ power.clear_orders()
+
+ def clear_cache(self):
+ """ Clears all caches """
+ self.convoy_paths_possible, self.convoy_paths_dest = None, None
+
+ def get_current_phase(self):
+ """ Returns the current phase (format 'S1901M' or 'FORMING' or 'COMPLETED' """
+ return self._phase_abbr()
+
+ def set_current_phase(self, new_phase):
+ """ Changes the phase to the specified new phase (e.g. 'S1901M') """
+ if new_phase in ('FORMING', 'COMPLETED'):
+ self.phase = new_phase
+ self.phase_type = None
+ else:
+ self.phase = self.map.phase_long(new_phase)
+ self.phase_type = self.phase.split()[-1][0]
+
+ def render(self, incl_orders=True, incl_abbrev=False, output_format='svg'):
+ """ Renders the current game and returns its image representation
+ :param incl_orders: Optional. Flag to indicate we also want to render orders.
+ :param incl_abbrev: Optional. Flag to indicate we also want to display the provinces abbreviations.
+ :param output_format: The desired output format.
+ :return: The rendered image in the specified format.
+ """
+ if not self.renderer:
+ self.renderer = Renderer(self)
+ return self.renderer.render(incl_orders=incl_orders, incl_abbrev=incl_abbrev, output_format=output_format)
+
+ def add_rule(self, rule):
+ """ Adds a rule to the current rule list
+ :param rule: Name of rule to add (e.g. 'NO_PRESS')
+ :return: Nothing
+ """
+ if not self.__class__.rule_cache:
+ self._load_rules()
+ valid_rules = {valid_rule for valid_rule in self.__class__.rule_cache[0]}
+
+ if rule not in valid_rules or rule in self.no_rules:
+ return
+
+ forbidden_rules = self.__class__.rule_cache[0].get(rule, {}).get('!', [])
+ rules_to_add = self.__class__.rule_cache[0].get(rule, {}).get('+', [])
+ rules_to_remove = self.__class__.rule_cache[0].get(rule, {}).get('-', [])
+
+ # Making sure we don't already have a forbidden rule
+ for forbidden in forbidden_rules:
+ if forbidden in self.rules:
+ self.error += [err.GAME_FORBIDDEN_RULE % (forbidden, rule)]
+ return
+ if forbidden not in self.no_rules:
+ self.no_rules.add(forbidden)
+
+ # Adding rules
+ for rule_to_add in rules_to_add:
+ if rule_to_add not in self.rules:
+ self.rules.append(rule_to_add)
+
+ # Removing rules
+ for rule_to_remove in rules_to_remove:
+ if rule_to_remove in self.rules:
+ self.rules.remove(rule_to_remove)
+
+ # Adding main rule
+ if rule not in self.rules:
+ self.rules.append(rule)
+
+ def remove_rule(self, rule):
+ """ Removes a rule from the current rule list
+ :param rule: Name of rule to remove (e.g. 'NO_PRESS')
+ :return: Nothing
+ """
+ if rule in self.rules:
+ self.rules.remove(rule)
+
+ def load_map(self, reinit_powers=True):
+ """ Load a map and process directives
+ :param reinit_powers: Boolean. If true, empty powers dict.
+ :return: Nothing, but stores the map in self.map
+ """
+ # Create a map, and check for errors
+ self.map = Map(self.map_name)
+ if self.map_name != self.map.name:
+ raise RuntimeError('Invalid Map loaded. Expected %s - Got %s' % (self.map_name, self.map.name))
+
+ # Adding map rules
+ for rule in self.map.rules:
+ self.add_rule(rule)
+
+ # Build Zobrist tables
+ self._build_hash_table()
+
+ self.error += self.map.error
+
+ # Sets the current phase to the long version
+ if self.phase and ' ' not in self.phase and self.phase not in ('FORMING', 'COMPLETED'):
+ self.phase = self.map.phase_long(self.phase)
+
+ # Have the Game process all lines in the map file that were in DIRECTIVES clauses (this includes any RULE lines)
+ # Do this for all directives given without a variant and for those specific for this Game's variant.
+ if self.phase == 'FORMING':
+ return
+
+ # Resetting powers
+ if reinit_powers:
+ self.powers = {}
+
+ def process(self):
+ """ Processes the current phase of the game.
+ :return: game phase data with data before processing.
+ """
+ previous_phase = self._phase_wrapper_type(self.current_short_phase)
+ previous_orders = self.get_orders()
+ previous_messages = self.messages.copy()
+ previous_state = self.get_state()
+
+ if self.error:
+ if 'IGNORE_ERRORS' not in self.rules:
+ print('The following errors were encountered and were cleared before processing.')
+ for error in self.error:
+ print('-- %s' % error)
+ print('-' * 32)
+ self.error = []
+ self._process()
+
+ # result_history should have been updated with orders results for processed (previous) phase.
+
+ self.clear_vote()
+ self.clear_orders()
+ self.messages.clear()
+ self.order_history.put(previous_phase, previous_orders)
+ self.message_history.put(previous_phase, previous_messages)
+ self.state_history.put(previous_phase, previous_state)
+ return GamePhaseData(name=str(previous_phase),
+ state=previous_state,
+ orders=previous_orders,
+ messages=previous_messages,
+ results=self.result_history[previous_phase])
+
+ def rebuild_hash(self):
+ """ Completely recalculate the Zobrist hash
+ :return: The updated hash value
+ """
+ self.zobrist_hash = 0
+ if self.map is None:
+ return 0
+
+ # Recalculating for each power
+ for power in self.powers.values():
+ for unit in power.units:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ for dis_unit in power.retreats:
+ self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ for center in power.centers:
+ self.update_hash(power.name, loc=center, is_center=True)
+ for home in power.homes:
+ self.update_hash(power.name, loc=home, is_home=True)
+
+ # Clearing cache
+ self.clear_cache()
+
+ # Returning the new hash
+ return self.get_hash()
+
+ def get_hash(self):
+ """ Returns the zobrist hash for the current game """
+ # Needs to be a string, otherwise json.dumps overflows
+ return str(self.zobrist_hash)
+
+ def update_hash(self, power, unit_type='', loc='', is_dislodged=False, is_center=False, is_home=False):
+ """ Updates the zobrist hash for the current game
+ :param power: The name of the power owning the unit, supply center or home
+ :param unit_type: Contains the unit type of the unit being added or remove from the board ('A' or 'F')
+ :param loc: Contains the location of the unit, supply center, of home being added or remove
+ :param is_dislodged: Indicates that the unit being added/removed is dislodged
+ :param is_center: Indicates that the location being added/removed is a supply center
+ :param is_home: Indicates that the location being added/removed is a home
+ :return: Nothing
+ """
+ if self.map is None:
+ return
+ zobrist = self.__class__.zobrist_tables[self.map_name]
+ loc = loc[:3].upper() if is_center or is_home else loc.upper()
+ power = power.upper()
+
+ power_ix = zobrist['map_powers'].index(power)
+ loc_ix = zobrist['map_locs'].index(loc)
+ unit_type_ix = ['A', 'F'].index(unit_type) if unit_type in ['A', 'F'] else -1
+
+ # Dislodged
+ if is_dislodged:
+ self.zobrist_hash ^= zobrist['dis_unit_type'][unit_type_ix, loc_ix]
+ self.zobrist_hash ^= zobrist['dis_units'][power_ix, loc_ix]
+
+ # Supply Center
+ elif is_center:
+ self.zobrist_hash ^= zobrist['centers'][power_ix, loc_ix]
+
+ # Home
+ elif is_home:
+ self.zobrist_hash ^= zobrist['homes'][power_ix, loc_ix]
+
+ # Regular unit
+ else:
+ self.zobrist_hash ^= zobrist['unit_type'][unit_type_ix, loc_ix]
+ self.zobrist_hash ^= zobrist['units'][power_ix, loc_ix]
+
+ def get_phase_data(self):
+ """ Return a GamePhaseData object representing current game. """
+ # Associate each power name to power orders, or None if order ist not set for the power.
+ # This is done to make distinction between voluntary empty orders ([]) and unset orders (None).
+ current_orders = {power.name: (self.get_orders(power.name) if power.order_is_set else None)
+ for power in self.powers.values()}
+ # Game does not have results for current orders (until orders are processed and game phase is updated).
+ return GamePhaseData(name=self.current_short_phase,
+ state=self.get_state(),
+ orders=current_orders,
+ messages=self.messages.copy(),
+ results={})
+
+ def set_phase_data(self, phase_data, clear_history=True):
+ """ Set game from phase data.
+ :param phase_data: either a GamePhaseData or a list of GamePhaseData.
+ If phase_data is a GamePhaseData, it will be treated as a list of GamePhaseData with 1 element.
+ Last phase data in given list will be used to set current game internal state.
+ Previous phase data in given list will replace current game history.
+ :param clear_history: Indicate if we must clear game history fields before update.
+ """
+ if not phase_data:
+ return
+ if isinstance(phase_data, GamePhaseData):
+ phase_data = [phase_data]
+ elif not isinstance(phase_data, list):
+ phase_data = list(phase_data)
+
+ if clear_history:
+ self._clear_history()
+
+ for game_phase_data in phase_data[:-1]: # type: GamePhaseData
+ self.extend_phase_history(game_phase_data)
+
+ current_phase_data = phase_data[-1] # type: GamePhaseData
+ self.set_state(current_phase_data.state, clear_history=False)
+ for power_name, power_orders in current_phase_data.orders.items():
+ if power_orders is not None:
+ self.set_orders(power_name, power_orders)
+ self.messages = current_phase_data.messages.copy()
+ # We ignore 'results' for current phase data.
+
+ def get_state(self):
+ """ Gets the internal saved state of the game.
+ This state is intended to represent current game view
+ (powers states, orders results for previous phase, and few more info).
+ See field message_history to get messages from previous phases.
+ See field order_history to get orders from previous phases.
+ To get a complete state of all data in this game object, consider using method Game.to_dict().
+
+ :param make_copy: Boolean. If true, a deep copy of the game state is returned, otherwise the attributes are
+ returned directly.
+ :return: The internal saved state (dict) of the game
+ """
+ state = {}
+ state['timestamp'] = common.timestamp_microseconds()
+ state['zobrist_hash'] = self.get_hash()
+ state['note'] = self.note
+ state['name'] = self._phase_abbr()
+ state['units'] = {}
+ state['centers'] = {}
+ state['homes'] = {}
+ state['influence'] = {}
+ state['civil_disorder'] = {}
+ state['builds'] = {}
+
+ # Setting powers data: units, centers, homes, influence and civil disorder.
+ for power in self.powers.values():
+ state['units'][power.name] = list(power.units) + ['*{}'.format(d) for d in power.retreats]
+ state['centers'][power.name] = list(power.centers)
+ state['homes'][power.name] = list(power.homes)
+ state['influence'][power.name] = list(power.influence)
+ state['civil_disorder'][power.name] = power.civil_disorder
+ # Setting build
+ state['builds'][power.name] = {}
+ if self.phase_type != 'A':
+ state['builds'][power.name]['count'] = 0
+ else:
+ state['builds'][power.name]['count'] = len(power.centers) - len(power.units)
+ state['builds'][power.name]['homes'] = []
+ if state['builds'][power.name].get('count', 0) > 0:
+ build_sites = self._build_sites(power)
+ state['builds'][power.name]['count'] = min(len(build_sites), state['builds'][power.name]['count'])
+ state['builds'][power.name]['homes'] = build_sites
+
+ # Returning state
+ return state
+
+ def set_state(self, state, clear_history=True):
+ """ Sets the game from a saved internal state
+ :param state: The saved state (dict)
+ :param clear_history: Boolean. If true, all game histories are cleared.
+ :return: Nothing
+ """
+ if clear_history:
+ self._clear_history()
+
+ if 'map' in state and self.map.name != state['map']:
+ raise RuntimeError('Inconsistent state map (state: %s, game: %s)' % (state['map'], self.map.name))
+ if 'rules' in state:
+ self.rules = []
+ for rule in state['rules']:
+ self.add_rule(rule)
+
+ if 'note' in state:
+ self.note = state['note']
+ if 'name' in state and state['name']:
+ self.set_current_phase(state['name'])
+ if 'units' in state:
+ for power_name, units in state['units'].items():
+ self.set_units(power_name, units, reset=True)
+ if 'centers' in state:
+ for power_name, centers in state['centers'].items():
+ self.set_centers(power_name, centers, reset=True)
+ for power in self.powers.values():
+ if 'homes' in state and power.name in state['homes']:
+ power.homes = list(state['homes'][power.name])
+ else:
+ power.homes = list(self.map.homes[power.name])
+ if 'influence' in state:
+ for power_name, influence in state['influence'].items():
+ power = self.get_power(power_name)
+ power.influence = deepcopy(influence)
+ if 'civil_disorder' in state:
+ for power_name, civil_disorder in state['civil_disorder'].items():
+ power = self.get_power(power_name)
+ power.civil_disorder = civil_disorder
+
+ # Rebuilding hash and returning
+ self.rebuild_hash()
+ self._build_list_possible_convoys()
+
+ def get_all_possible_orders(self, loc=None):
+ """ Computes a list of all possible orders for a unit in a given location
+ :param loc: Optional. The location where to get a list of orders (must include coasts)
+ If not provided, returns a list of all possible orders for all locations
+ :return: A list of orders for the unit, if there is a unit at location, or a list of possible
+ orders for all locations if no locations are provided.
+ """
+ # pylint: disable=too-many-branches
+ # No locations, building a dict with all locations
+ if not loc:
+ all_possible_orders = {}
+ for map_loc in self.map.locs:
+ map_loc = map_loc.upper()
+ all_possible_orders[map_loc] = self.get_all_possible_orders(map_loc)
+ return all_possible_orders
+
+ def remove_duplicates(list_with_dup):
+ """ Shorthand functions to remove duplicates """
+ seen = set()
+ return [item for item in list_with_dup if not (item in seen or seen.add(item))]
+
+ # Otherwise finding the possible orders at that specific location
+ possible_orders = []
+ is_dislodged = False
+ unit = None
+ unit_power = None
+
+ # If there are coasts possible, recursively adding the coasts first, then adding the loc orders
+ if '/' not in loc:
+ for loc_with_coast in [coast for coast in self.map.find_coasts(loc) if '/' in coast]:
+ possible_orders += self.get_all_possible_orders(loc_with_coast)
+
+ # Determining if there is a unit at loc
+ # Dislodged unit have precedence over regular unit in Retreat phase
+ for power in self.powers.values():
+ dislodged = [u for u in power.retreats if u[2:] == loc.upper()]
+ regular = [u for u in power.units if u[2:] == loc.upper()]
+ if dislodged:
+ is_dislodged = True
+ unit = dislodged[0]
+ unit_power = power
+ break
+ elif regular and not is_dislodged:
+ unit = regular[0]
+ unit_power = power
+ if self.phase_type != 'R':
+ break
+
+ # No unit found, checking if location is a home
+ if unit is None:
+ if self.phase_type != 'A':
+ return remove_duplicates(possible_orders)
+ for power in self.powers.values():
+ if loc[:3] in power.homes and 'BUILD_ANY' not in self.rules:
+ unit_power = power
+ break
+ if loc[:3] in power.centers and 'BUILD_ANY' in self.rules:
+ unit_power = power
+ break
+
+ # Not a home, and no units
+ if not unit_power:
+ return remove_duplicates(possible_orders)
+
+ # Determining if we can build or need to remove units
+ build_count = 0 if self.phase_type != 'A' else len(unit_power.centers) - len(unit_power.units)
+
+ # Determining unit type and unit location
+ unit_type = unit[0] if unit else ''
+ unit_loc = unit[2:] if unit else ''
+
+ # Movement phase
+ if self.phase_type == 'M':
+ # Computing coasts for dest
+ dest_1_hops = [l.upper() for l in self.map.abut_list(unit_loc, incl_no_coast=True)]
+ dest_with_coasts = [self.map.find_coasts(dest) for dest in dest_1_hops]
+ dest_with_coasts = {val for sublist in dest_with_coasts for val in sublist}
+
+ # Hold
+ possible_orders += ['{} H'.format(unit)]
+
+ # Move (Regular) and Support (Hold)
+ for dest in dest_with_coasts:
+ if self._abuts(unit_type, unit_loc, '-', dest):
+ possible_orders += ['{} - {}'.format(unit, dest)]
+ if self._abuts(unit_type, unit_loc, 'S', dest):
+ if self._unit_owner('A {}'.format(dest)):
+ possible_orders += ['{} S A {}'.format(unit, dest)]
+ elif self._unit_owner('F {}'.format(dest)):
+ possible_orders += ['{} S F {}'.format(unit, dest)]
+
+ # Move Via Convoy
+ for dest in self._get_convoy_destinations(unit_type, unit_loc):
+ possible_orders += ['{} - {} VIA'.format(unit, dest)]
+
+ # Support (Move)
+ for dest in dest_with_coasts:
+
+ # Computing src of move (both from adjacent provinces and possible convoys)
+ # We can't support a unit that needs us to convoy it to its destination
+ abut_srcs = self.map.abut_list(dest, incl_no_coast=True)
+ convoy_srcs = self._get_convoy_destinations('A', dest, exclude_convoy_locs=[unit_loc])
+
+ # Computing coasts for source
+ src_with_coasts = [self.map.find_coasts(src) for src in abut_srcs + convoy_srcs]
+ src_with_coasts = {val for sublist in src_with_coasts for val in sublist}
+
+ for src in src_with_coasts:
+
+ # Checking if there is a unit on the src location
+ if self._unit_owner('A {}'.format(src)):
+ src_unit_type = 'A'
+ elif self._unit_owner('F {}'.format(src)):
+ src_unit_type = 'F'
+ else:
+ continue
+
+ # Checking if src unit can move to dest (through adj or convoy), and that we can support it
+ # Only armies can move through convoy
+ if src[:3] != unit_loc[:3] \
+ and self._abuts(unit_type, unit_loc, 'S', dest) \
+ and ((src in convoy_srcs and src_unit_type == 'A')
+ or self._abuts(src_unit_type, src, '-', dest)):
+
+ # Adding with coast
+ possible_orders += ['{} S {} {} - {}'.format(unit, src_unit_type, src, dest)]
+
+ # Adding without coasts
+ if '/' in dest:
+ possible_orders += ['{} S {} {} - {}'.format(unit, src_unit_type, src, dest[:3])]
+
+ # Convoy
+ if unit_type == 'F':
+ convoy_srcs = self._get_convoy_destinations(unit_type, unit_loc, unit_is_convoyer=True)
+ for src in convoy_srcs:
+
+ # Checking if there is a unit on the src location
+ if unit_type == 'F' and self._unit_owner('A {}'.format(src)):
+ src_unit_type = 'A'
+ else:
+ continue
+
+ # Checking where the src unit can actually go
+ convoy_dests = self._get_convoy_destinations(src_unit_type, src, unit_is_convoyer=False)
+
+ # Adding them as possible moves
+ for dest in convoy_dests:
+ if self._has_convoy_path(src_unit_type, src, dest, convoying_loc=unit_loc):
+ possible_orders += ['{} C {} {} - {}'.format(unit, src_unit_type, src, dest)]
+
+ # Retreat phase
+ if self.phase_type == 'R':
+
+ # Disband
+ if is_dislodged:
+ possible_orders += ['{} D'.format(unit)]
+
+ # Retreat
+ if is_dislodged:
+ retreat_locs = unit_power.retreats[unit]
+ for dest in retreat_locs:
+ dest = dest.upper()
+ if not self._unit_owner('A {}'.format(dest[:3]), coast_required=0) \
+ and not self._unit_owner('F {}'.format(dest[:3]), coast_required=0):
+ possible_orders += ['{} R {}'.format(unit, dest)]
+
+ # Adjustment Phase
+ if self.phase_type == 'A':
+ build_sites = self._build_sites(unit_power)
+
+ # Disband
+ if build_count < 0 and unit:
+ possible_orders += ['{} D'.format(unit)]
+
+ # Build Army / Fleet
+ if build_count > 0 \
+ and loc[:3] in build_sites \
+ and not self._unit_owner('A ' + loc[:3], coast_required=0) \
+ and not self._unit_owner('F ' + loc[:3], coast_required=0):
+ if self.map.is_valid_unit('A {}'.format(loc)):
+ possible_orders += ['A {} B'.format(loc)]
+ if self.map.is_valid_unit('F {}'.format(loc)):
+ possible_orders += ['F {} B'.format(loc)]
+
+ # Waive
+ if build_count > 0:
+ possible_orders += ['WAIVE']
+
+ # Removing duplicate
+ return remove_duplicates(possible_orders)
+
+ # ====================================================================
+ # Private Interface - CONVOYS Methods
+ # ====================================================================
+ def _build_list_possible_convoys(self):
+ """ Regenerates the list of possible convoy paths given the current fleet locations """
+ # Already generated
+ if self.convoy_paths_possible is not None:
+ return
+ self.convoy_paths_possible = []
+ self.convoy_paths_dest = {}
+
+ # Finding fleets on water
+ convoying_locs = []
+ for power in self.powers.values():
+ for unit in power.units:
+ if unit[0] == 'F' and self.map.area_type(unit[2:]) in ['WATER', 'PORT']:
+ convoying_locs += [unit[2:]]
+ convoying_locs = set(convoying_locs)
+
+ # Finding all possible convoy paths
+ for nb_fleets in range(1, len(convoying_locs) + 1):
+ for start, fleets, dests in self.map.convoy_paths[nb_fleets]:
+ if fleets.issubset(convoying_locs):
+ self.convoy_paths_possible += [(start, fleets, dests)]
+
+ # Marking path to dest
+ self.convoy_paths_dest.setdefault(start, {})
+ for dest in dests:
+ self.convoy_paths_dest[start].setdefault(dest, [])
+ self.convoy_paths_dest[start][dest] += [fleets]
+
+ def _is_convoyer(self, army, loc):
+ """ Detects if there is a convoyer at thru location for army/fleet (e.g. can an army be convoyed through PAR)
+ :param army: Boolean to indicate if unit being convoyed is army (1) or fleet (0)
+ :param loc: Location we are checking (e.g. 'STP/SC')
+ :return: Boolean to indicate if unit can be convoyed through location
+ """
+ # Armies can't convoy fleet, so if unit being convoyed is not an army, convoy not possible
+ if not army:
+ return False
+
+ # Army can convoy through water, all units can convoy through port
+ area_type = self.map.area_type(loc)
+ area_type_cond = ((area_type == 'WATER') == army or area_type == 'PORT')
+
+ # Making sure there is a valid unit on thru location to perform convoy
+ unit_type_cond = self._unit_owner('F %s' % loc, coast_required=0)
+ return area_type_cond and unit_type_cond
+
+ def _is_moving_via_convoy(self, unit):
+ """ Determines if a unit is moving via a convoy or through land
+ :param unit: The name of the unit (e.g. 'A PAR')
+ :return: A boolean (True, False) to indicate if the unit is moving via convoy
+ """
+ # Not moving or no paths
+ if unit not in self.command or self.command[unit][0] != '-':
+ return False
+ if unit not in self.convoy_paths or not self.convoy_paths[unit]:
+ return False
+
+ # Otherwise, convoying since there is still an active valid path
+ return True
+
+ def _has_convoy_path(self, unit, start, end, convoying_loc=None):
+ """ Determines if there is a convoy path for unit
+ :param unit: The unit BEING convoyed (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param end: The destination of the unit (e.g. 'MAR')
+ :param convoying_loc: Optional. If set, the convoying location must be in one of the paths
+ :return: A boolean flag to indicate if the convoy is possible (if all units cooperate)
+ """
+ if unit != 'A':
+ return False
+
+ # Checking in table if there is a valid path and optionally if the convoying loc is in the path
+ self._build_list_possible_convoys()
+ active_paths = self.convoy_paths_dest.get(start, {}).get(end, [])
+ return active_paths and (convoying_loc is None or [1 for path in active_paths if convoying_loc in path])
+
+ def _get_convoying_units_for_path(self, unit, start, end):
+ """ Returns a list of units who have submitted orders to convoy 'unit' from 'start' to 'end'
+ :param unit: The unit BEING convoyed (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param end: The destination of the unit (e.g. 'MAR')
+ :return: A list of convoying units (e.g. ['F NAO', 'F MAO']) having current orders to convoy path
+ """
+ convoying_units = []
+ army = unit != 'F'
+ expected_order = 'C %s %s - %s' % (unit, start[:3], end[:3])
+ for unit_loc, unit_order in list(self.command.items()):
+ if unit_order == expected_order and self._is_convoyer(army, unit_loc[2:]):
+ convoying_units += [unit_loc]
+ return convoying_units
+
+ def _get_convoy_destinations(self, unit, start, unit_is_convoyer=False, exclude_convoy_locs=None):
+ """ Returns a list of possible convoy destinations for a unit
+ :param unit: The unit BEING convoyed (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param unit_is_convoyer: Boolean flag. If true, list all the dests that an unit being convoyed by unit
+ could reach
+ :param exclude_convoy_locs: Optional. A list of convoying location that needs to be excluded from all paths.
+ :return: A list of convoying destinations (e.g. ['PAR', 'MAR']) that can be reached from start
+ """
+ if unit == 'A' and unit_is_convoyer:
+ return []
+ if unit == 'F' and not unit_is_convoyer:
+ return []
+
+ # Building cache
+ self._build_list_possible_convoys()
+
+ # If we are moving via convoy, we just read the destinations from the table
+ if not unit_is_convoyer:
+ if not exclude_convoy_locs:
+ return list(self.convoy_paths_dest.get(start, {}).keys())
+
+ # We need to loop to make sure there is a path without the excluded convoyer
+ dests = []
+ for dest, paths in self.convoy_paths_dest.get(start, {}).items():
+ for path in paths:
+ if not [1 for excluded_loc in exclude_convoy_locs if excluded_loc in path]:
+ dests += [dest]
+ break
+ return dests
+
+ # If we are convoying, we need to loop through the possible convoy paths
+ valid_dests = set([])
+ for _, fleets, dests in self.convoy_paths_possible:
+ if start in fleets and (exclude_convoy_locs is None
+ or not [1 for excluded_loc in exclude_convoy_locs if excluded_loc in fleets]):
+ valid_dests |= dests
+ return list(valid_dests)
+
+ def _get_convoy_paths(self, unit_type, start, end, via, convoying_units):
+ """ Return a list of all possible convoy paths (using convoying units) from start to end
+ :param unit_type: The unit type BEING convoyed (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param end: The destination of the unit (e.g. 'MAR')
+ :param via: Boolean flag (0 or 1) to indicate if we want only paths with a local convoyer, or also paths
+ including only foreign convoyers
+ :param convoying_units: The list of units who can convoy the unit
+ :return: A list of paths from start to end using convoying_units
+ """
+ if unit_type != 'A' or not convoying_units:
+ return []
+
+ # Building cache and finding possible paths with convoying units
+ # Adding start and end location to every path
+ self._build_list_possible_convoys()
+ fleets = {loc[2:] for loc in convoying_units}
+ paths = [path for path in self.convoy_paths_dest.get(start, {}).get(end, set([])) if path.issubset(fleets)]
+ paths = [[start] + list(path) + [end] for path in paths]
+ paths.sort(key=len)
+
+ # No paths found
+ if not paths:
+ return []
+
+ # We have intent to convoy, so we can use all paths
+ if via:
+ return paths
+
+ # Assuming intent if end is not reachable from start (i.e. a convoy is required)
+ if not self._abuts(unit_type, start, 'S', end):
+ return paths
+
+ # Otherwise, detecting if we intended to convoy
+ unit_owner = self._unit_owner('%s %s' % (unit_type, start), coast_required=0)
+ for convoyer in convoying_units:
+ convoy_owner = self._unit_owner(convoyer, coast_required=1)
+
+ # We have intent if one of the power's fleet issued a convoyed order
+ # and there was a path using that fleet to move from start to end
+ if unit_owner == convoy_owner and \
+ self._has_convoy_path(unit_type, start, end, convoying_loc=convoyer[2:]):
+ return paths
+
+ # We could not detect intent
+ return []
+
+ def _get_distance_to_home(self, unit_type, start, homes):
+ """ Calculate the distance from unit to one of its homes
+ Armies can move over water (4.D.8 choice d)
+ :param unit_type: The unit type to calculate distance (e.g. 'A' or 'F')
+ :param start: The start location of the unit (e.g. 'LON')
+ :param homes: The list of homes (first one reached calculates the distance)
+ :return: The minimum distance from unit to one of the homes
+ """
+ visited = []
+ if not homes:
+ return 99999
+
+ # Modified Djikstra
+ to_check = PriorityDict()
+ to_check[start] = 0
+ while to_check:
+ distance, current = to_check.smallest()
+ del to_check[current]
+
+ # Found smallest distance
+ if current[:3] in homes:
+ return distance
+
+ # Marking visited
+ if current in visited:
+ continue
+ visited += [current]
+
+ # Finding neighbors and updating distance
+ for loc in self.map.abut_list(current, incl_no_coast=True):
+ loc = loc.upper()
+ if loc in visited:
+ continue
+
+ # Calculating distance for armies over LAND/WATER/COAST and for Fleet over WATER/COAST
+ if unit_type == 'A' or self._abuts(unit_type, current, '-', loc):
+ loc_distance = to_check[loc] if loc in to_check else 99999
+ to_check[loc] = min(distance + 1, loc_distance)
+
+ # Could not find destination
+ return 99999
+
+ # ====================================================================
+ # Private Interface - ORDER Validation Methods
+ # ====================================================================
+ def _valid_order(self, power, unit, order, report=1):
+ """ Determines if an order is valid
+ :param power: The power submitting the order
+ :param unit: The unit being affected by the order (e.g. 'A PAR')
+ :param order: The actual order (e.g. 'H' or 'S A MAR')
+ :param report: Boolean to report errors in self.errors
+ :return: One of the following:
+ None - The order is NOT valid at all
+ -1 - It is NOT valid, BUT it does not get reported because it may be used to signal support
+ 0 - It is valid, BUT some unit mentioned does not exist
+ 1 - It is completed valid
+ """
+ # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
+ # No order
+ if not order:
+ return None
+ word = order.split()
+ owner = self._unit_owner(unit)
+ rules = self.rules
+
+ # No order
+ if not word:
+ return None
+
+ status = 1 if owner is not None else 0
+ unit_type = unit[0]
+ unit_loc = unit[2:]
+ order_type = word[0]
+
+ # Make sure the unit exists (or if the player is in a game in which he can't necessarily know) could exist.
+ # Also make sure any mentioned (supported or conveyed) unit could exists and could reach the listed destination
+ if not self.map.is_valid_unit(unit):
+ if report:
+ self.error.append(err.GAME_ORDER_TO_INVALID_UNIT % unit)
+ return None
+
+ # Support / Convoy - 'S A/F XXX - YYY'
+ if order_type in ('S', 'C') and word[1:]:
+ if word[1] in ('A', 'F'):
+ alter, other = word[1:3]
+ else:
+ alter, other = '?', word[1]
+
+ # Checks if A/F XXX is a valid unit for loc (source)
+ other = alter + ' ' + other
+ if not self.map.is_valid_unit(other, no_coast_ok=1):
+ if report:
+ self.error.append(err.GAME_ORDER_INCLUDES_INVALID_UNIT % other)
+ return None
+
+ # S [A/F] XXX - YYY
+ # Checks if A/F YYY is a valid unit for loc (dest)
+ if len(word) == 5 - (alter == '?'):
+ other = alter + ' ' + word[-1]
+ if not self.map.is_valid_unit(other, no_coast_ok=1):
+ if report:
+ self.error.append(err.GAME_ORDER_INCLUDES_INVALID_DEST % other)
+ return None
+
+ # Check if unit exists
+ # Status - 1 if unit has owner, 0 otherwise (Non-existent unit)
+ if not status:
+ if report:
+ self.error.append(err.GAME_ORDER_NON_EXISTENT_UNIT % unit)
+ return None
+ if power is not owner:
+ if report:
+ self.error.append(err.GAME_ORDER_TO_FOREIGN_UNIT % unit)
+ return None
+
+ # Validate that anything in a SHUT location is only ordered to HOLD
+ if self.map.area_type(unit_loc) == 'SHUT' and order_type != 'H':
+ if report:
+ self.error.append(err.GAME_UNIT_MAY_ONLY_HOLD % unit)
+ return None
+
+ # Validate support and convoy orders
+ # Triggers error if Army trying to convoys
+ if order_type == 'C' and (unit_type != 'F' or (self.map.area_type(unit_loc) not in ('WATER', 'PORT'))):
+ if report:
+ self.error.append(err.GAME_CONVOY_IMPROPER_UNIT % (unit, order))
+ return None
+
+ # -------------------------------------------------------------
+ # SUPPORT OR CONVOY ORDER
+ if order_type in ('C', 'S'):
+
+ # Add the unit type (or '?') if not specified.
+ # Note that the unit type is NOT added to the actual order -- just used during checking.
+ order_text = 'CONVOY' if order_type == 'C' else 'SUPPORT'
+ if len(word) > 1 and word[1] not in ('A', 'F'):
+ terrain = self.map.area_type(word[1])
+ if order_type == 'C':
+ word[1:1] = ['AF'[unit_type == 'A']] # Convoying the opposite unit type A-F and F-A
+ elif terrain == 'WATER':
+ word[1:1] = ['F']
+ elif terrain == 'LAND':
+ word[1:1] = ['A']
+ elif terrain: # Other terrain, trying to determine if XXX exist
+ its_unit_type = [unit_type for unit_type in 'AF' if self._unit_owner(unit_type + ' ' + word[1])]
+ if its_unit_type:
+ word[1:1] = its_unit_type
+ else:
+ if report:
+ self.error.append(err.GAME_INVALID_ORDER_NON_EXISTENT_UNIT % (order_text, unit, order))
+ return None
+ else:
+ if report:
+ self.error.append(err.GAME_INVALID_ORDER_RECIPIENT % (order_text, unit, order))
+ return None
+
+ # Make sure we have enough to work with
+ # e.g. syntax S A XXX - YYY or at least S XXX YYY
+ if len(word) < 3:
+ if report:
+ self.error.append(err.GAME_BAD_ORDER_SYNTAX % (order_text, unit, order))
+ return None
+
+ # Check that the recipient of the support or convoy exists
+ rcvr, dest = ' '.join(word[1:3]), word[2]
+ if not self._unit_owner(rcvr, 0):
+ if report:
+ self.error.append(err.GAME_ORDER_RECIPIENT_DOES_NOT_EXIST % (order_text, unit, order))
+ return None
+
+ # Check that the recipient is not the same unit as the supporter
+ if unit_loc == dest:
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_SUPPORT_ITSELF % (unit, order))
+ return None
+
+ # Only units on coasts can be convoyed, or invalid units convoying
+ if order_type == 'C' \
+ and (word[1] != 'AF'[unit_type == 'A'] or self.map.area_type(dest) not in ('COAST', 'PORT')):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_BE_CONVOYED % (unit, order))
+ return None
+
+ # Handle orders of the form C U xxx - xxx and S U xxx - xxx
+ if len(word) == 5:
+ if word[3] != '-':
+ if report:
+ self.error.append(err.GAME_BAD_ORDER_SYNTAX % (order_text, unit, order))
+ return None
+ dest = word[4]
+
+ # Coast is specified in the move, but ignored in the support and convoy order
+ # DATC 6.B.4
+ if '/' in dest:
+ dest = dest[:dest.find('/')]
+
+ # Making sure the dest is COAST,PORT and that the convoyed order can land there
+ if order_type == 'C':
+ if not (self.map.area_type(dest) in ('COAST', 'PORT')
+ and self.map.is_valid_unit(word[1] + ' ' + dest, unit[0] < 'F')):
+ if report:
+ self.error.append(err.GAME_BAD_CONVOY_DESTINATION % (unit, order))
+ return None
+
+ # Checking that support can reach destination...
+ elif (not self._abuts(word[1], word[2], order_type, dest)
+ and (rcvr[0] == 'F'
+ or not self._has_convoy_path(word[1], word[2][:3], dest))):
+ if report:
+ self.error.append(err.GAME_SUPPORTED_UNIT_CANT_REACH_DESTINATION % (unit, order))
+ return None
+
+ # Make sure that a convoy order was formatted as above
+ elif order_type == 'C':
+ if report:
+ self.error.append(err.GAME_IMPROPER_CONVOY_ORDER % (unit, order))
+ return None
+
+ # Make sure a support order was either as above or as S U xxx or as S U xxx H
+ elif len(word) != 3 and (len(word) != 4 or word[-1] != 'H'):
+ if report:
+ self.error.append(err.GAME_IMPROPER_SUPPORT_ORDER % (unit, order))
+ return None
+
+ # Make sure the support destination can be reached...
+ if order_type == 'S':
+ if not self._abuts(unit_type, unit_loc, order_type, dest):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_PROVIDE_SUPPORT_TO_DEST % (unit, order))
+ return None
+
+ # ... or that the fleet can perform the described convoy
+ elif not self._has_convoy_path(rcvr[0], rcvr[2:5], dest, convoying_loc=unit_loc):
+ if report:
+ self.error.append(err.GAME_IMPOSSIBLE_CONVOY_ORDER % (unit, order))
+ return None
+
+ # -------------------------------------------------------------
+ # MOVE order
+ elif order_type == '-':
+ # Expected format '- xxx' or '- xxx - yyy - zzz'
+ if (len(word) & 1 and word[-1] != 'VIA') or (len(word[:-1]) & 1 and word[-1] == 'VIA'):
+ if report:
+ self.error.append(err.GAME_BAD_MOVE_ORDER % (unit, order))
+ return None
+
+ # Only a convoying army can give a path
+ if len(word) > 2 and unit_type != 'A' and self.map.area_type(unit_loc) not in ('COAST', 'PORT'):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_CONVOY % (unit, order))
+ return None
+
+ # Step through every "- xxx" in the order and ensure the unit can get there at every step
+ src = unit_loc
+ order_type = 'C-'[len(word) == 2 or (len(word) == 3 and word[-1] == 'VIA')]
+ visit = []
+
+ # Checking that unit is not returning back where it started ...
+ if word[-1] == unit_loc and order_type < 'C':
+ if report:
+ self.error.append(err.GAME_MOVING_UNIT_CANT_RETURN % (unit, order))
+ return None
+
+ # For a multi-step convoy
+ if order_type == 'C':
+
+ # Checking that destination is a COAST or PORT ...
+ if self.map.area_type(word[-1]) not in ('COAST', 'PORT'):
+ if report:
+ self.error.append(err.GAME_CONVOYING_UNIT_MUST_REACH_COST % (unit, order))
+ return None
+
+ # Making sure that army is not having a specific coast as destination ...
+ if unit_type == 'A' and '/' in word[-1]:
+ if report:
+ self.error.append(err.GAME_ARMY_CANT_CONVOY_TO_COAST % (unit, order))
+ return None
+
+ # Making sure that the syntax is '- xxx - yyy - zzz'
+ offset = 1 if word[-1] == 'VIA' else 0
+ if [1 for x in range(0, len(word) - offset, 2) if word[x] != '-']:
+ if report:
+ self.error.append(err.GAME_BAD_MOVE_ORDER % (unit, order))
+ return None
+
+ # For every location touched
+ ride = word[1:len(word) - offset:2]
+ for num, to_loc in enumerate(ride):
+
+ # Checking that ride is not visited twice ...
+ if to_loc in visit and 'CONVOY_BACK' not in rules:
+ if report:
+ self.error.append(err.GAME_CONVOY_UNIT_USED_TWICE % (unit, order))
+ return None
+ visit += [to_loc]
+
+ # Making sure the last 2 locations touch, and that A/F can convoy through them
+ # pylint: disable=too-many-boolean-expressions
+ if (not self._abuts(unit_type, src, order_type, to_loc)
+ and not self._has_convoy_path(unit_type, unit_loc, to_loc)
+ and (len(word) == 2
+ or unit_type == 'A' and (not self._abuts('F', to_loc, 'S', src))
+ or (unit_type == 'F'
+ and (to_loc[:3].upper() not in
+ [abut[:3].upper() for abut in self.map.abut_list(src[:3])])))):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_MOVE_INTO_DEST % (unit, order))
+ return None
+
+ # If VIA flag set, make sure there is at least a possible path
+ if word[-1] == 'VIA' and not self._has_convoy_path(unit_type, unit_loc, to_loc):
+ if report:
+ self.error.append(err.GAME_UNIT_CANT_MOVE_VIA_CONVOY_INTO_DEST % (unit, order))
+ return None
+
+ # If we are at an intermediary location
+ if num < len(ride) - 1:
+
+ # Trying to portage convoy fleet through water
+ # or trying to convoy army through LAND or COAST
+ if (unit_type == 'F'
+ and ((unit_type == 'A' and self.map.area_type(to_loc) not in ('WATER', 'PORT'))
+ or unit_type + self.map.area_type(to_loc) == 'FWATER')):
+ if report:
+ self.error.append(err.GAME_BAD_CONVOY_MOVE_ORDER % (unit, order))
+ return None
+
+ # Making sure there is a unit there to convoy ...
+ if not self._unit_owner('AF'[unit_type == 'A'] + ' ' + to_loc):
+ if report:
+ self.error.append(err.GAME_CONVOY_THROUGH_NON_EXISTENT_UNIT % (unit, order))
+ return None
+
+ # Portaging fleets must finish the turn on a coastal location listed in upper-case
+ elif (num
+ and unit_type == 'F'
+ and (to_loc not in self.map.loc_abut or self.map.area_type(to_loc) not in ('COAST', 'PORT'))):
+ if report:
+ self.error.append(err.GAME_IMPOSSIBLE_CONVOY % (unit, order))
+ return None
+ src = to_loc
+
+ # -------------------------------------------------------------
+ # HOLD order
+ elif order_type == 'H':
+ if len(word) != 1:
+ if report:
+ self.error.append(err.GAME_INVALID_HOLD_ORDER % (unit, order))
+ return None
+
+ else:
+ if report:
+ self.error.append(err.GAME_UNRECOGNIZED_ORDER_TYPE % (unit, order))
+ return None
+
+ # All done
+ return status
+
+ def _expand_order(self, word):
+ """ Detects errors in order, convert to short version, and expand the default coast if necessary
+ :param word: The words (e.g. order.split()) for an order
+ (e.g. ['England:', 'Army', 'Rumania', 'SUPPORT', 'German', 'Army', 'Bulgaria']).
+ :return: The compacted and expanded order (e.g. ['A', 'RUM', 'S', 'A', 'BUL'])
+ """
+ if not word:
+ return word
+
+ result = self.map.compact(' '.join(word))
+ result = self.map.vet(self.map.rearrange(result), 1)
+
+ # Removing errors (Negative values)
+ final, order = [], ''
+ for result_ix, (token, token_type) in enumerate(result):
+ if token_type < 1:
+ if token_type == -1 * POWER:
+ self.error.append(err.GAME_UNKNOWN_POWER % token)
+ continue
+ elif token_type == -1 * UNIT:
+ self.error.append(err.GAME_UNKNOWN_UNIT_TYPE % token)
+ continue
+ elif token_type == -1 * LOCATION:
+ self.error.append(err.GAME_UNKNOWN_LOCATION % token)
+ elif token_type == -1 * COAST:
+ token_without_coast = token.split('/')[0]
+ if token_without_coast in self.map.aliases.values():
+ self.error.append(err.GAME_UNKNOWN_COAST % token)
+ result[result_ix] = token_without_coast, -1 * LOCATION
+ else:
+ self.error.append(err.GAME_UNKNOWN_LOCATION % token)
+ elif token_type == -1 * ORDER:
+ self.error.append(err.GAME_UNKNOWN_ORDER_TYPE % token)
+ continue
+ else:
+ self.error.append(err.GAME_UNRECOGNIZED_ORDER_DATA % token)
+ continue
+ token_type = -1 * token_type
+
+ # Remove power names. Checking ownership of the unit might be better
+ if token_type == POWER:
+ continue
+
+ # Remove the "H" from any order having the form "u xxx S xxx H"
+ # Otherwise storing order
+ elif token_type == ORDER:
+ if order == 'S' and token == 'H':
+ continue
+ order += token
+
+ # Treat each move order the same. Eventually we'd want to distinguish between them
+ elif token_type == MOVE_SEP:
+ result[result_ix] = '-', token_type
+ order += '-'
+
+ elif token_type == OTHER:
+ order = ''
+
+ # Spot ambiguous place names and coasts in support and convoy orders
+ if 'NO_CHECK' in self.rules:
+ if token_type == LOCATION and token in self.map.unclear:
+ self.error.append(err.GAME_AMBIGUOUS_PLACE_NAME % token)
+ if token_type == COAST and token.split('/')[0] in self.map.unclear:
+ self.error.append(err.GAME_AMBIGUOUS_PLACE_NAME % token)
+
+ final += [token]
+
+ # Default any fleet move's coastal destination, then we're done
+ return self.map.default_coast(final)
+
+ def _expand_coast(self, word):
+ """ Makes sure the correct coast is specified (if any) is specified.
+ For Fleets: Adjust to correct coast if wrong coast is specified
+ For Armies: Removes coast if coast is specified
+ (e.g. if F is on SPA/SC but the order is F SPA/NC - LYO, the coast will be added or corrected)
+ :param word: A list of tokens (e.g. ['F', 'GRE', '-', 'BUL'])
+ :return: The updated list of tokens (e.g. ['F', 'GRE', '-', 'BUL/SC'])
+ """
+ if not word:
+ return word
+
+ unit_type = word[0]
+ loc = word[1]
+ loc_without_coast = loc[:loc.find('/')] if '/' in loc else loc
+
+ # For armies: Removing coast if specified
+ if unit_type == 'A':
+ if '/' in loc:
+ word[1] = loc_without_coast
+ if len(word) == 4 and '/' in word[3]:
+ word[3] = word[3][:word[3].find('/')]
+
+ # For fleets: If there is a unit in the country, but not on the specified coast, we need to correct the coast
+ elif self._unit_owner('%s %s' % (unit_type, loc), coast_required=1) is None \
+ and self._unit_owner('%s %s' % (unit_type, loc_without_coast), coast_required=0) is not None:
+
+ # Finding the correct coast
+ for loc in [l for l in self.map.locs if l[:3] == loc_without_coast]:
+ if self._unit_owner('%s %s' % (word[0], loc), coast_required=1) is not None:
+ word[1] = loc
+ break
+
+ # Removing cost if unit is supporting an army moving to coast
+ # F WES S A MAR - SPA/SC -> F WES S A MAR - SPA
+ if len(word) == 7 and '/' in word[-1] and word[2] == 'S' and word[3] == 'A':
+ dest = word[-1]
+ word[-1] = dest[:dest.find('/')]
+
+ # Adjusting the coast if a fleet is supporting a move to the wrong coast
+ # F WES S F GAS - SPA/SC -> F WES S F GAS - SPA/NC
+ if len(word) == 7 and word[0] == 'F' and word[2] == 'S' and word[3] == 'F' and '/' in word[-1]:
+ word = word[:3] + self.map.default_coast(word[3:6] + [word[6][:3]])
+
+ # Returning with coasts fixed
+ return word
+
+ def _add_unit_types(self, item):
+ """ Adds any missing "A" and "F" designations and (current) coastal locations for fleets.
+ :param item: The words for expand_order() (e.g. ['A', 'RUM', 'S', 'BUL'])
+ :return: The list of items with A/F and coasts added (e.g. ['A', 'RUM', 'S', 'A', 'BUL'])
+ """
+ # dependent is set when A/F is expected afterwards (so at start and after C/S)
+ # had_type indicates that A/F was found
+ word, dependent, had_type = [], 1, 0
+ for token in item:
+ if not dependent:
+ dependent = token in 'CS'
+ elif token in 'AF':
+ had_type = 1
+ elif token in ('RETREAT', 'DISBAND', 'BUILD', 'REMOVE'):
+ pass
+ else:
+ try:
+ # We have a location
+ # Try to find an active or retreating unit at current location
+ unit = [unit for power in self.powers.values()
+ for unit in (power.units, power.retreats.keys())[self.phase_type == 'R']
+ if unit[2:].startswith(token)][0]
+
+ # If A/F is missing, add it
+ if not had_type:
+ word += [unit[0]]
+
+ # Trying to detect if coast is specified in retrieved unit location
+ # If yes, update the token, so it incorporates coastal information
+ if self.map.is_valid_unit(word[-1] + unit[1:]):
+ token = unit[2:]
+ except IndexError:
+ pass
+ dependent = had_type = 0
+
+ # Add token to final list
+ word += [token]
+ return word
+
+ def _add_coasts(self):
+ """ This method adds the matching coast to orders supporting or (portage) convoying a fleet to
+ a multi-coast province.
+ :return: Nothing
+ """
+ # converting to unique format
+ orders = {}
+ for unit, order in self.orders.items():
+ orders[unit] = order
+
+ # Add coasts to support and (portage) convoy orders for fleets moving to a specific coast
+ for unit, order in orders.items():
+ # Only rewriting 'S F XXX - YYY' and 'C F XXX - YYY'
+ if order[:3] not in ('S F', 'C F'):
+ continue
+ word = order.split()
+
+ # rcvr is the unit receiving the support or convoy (e.g. F XXX in S F XXX - BER)
+ # Making sure rcvr has also submitted orders (e.g. F XXX - YYY)
+ rcvr = ' '.join(word[1:3])
+ try:
+ rcvr = [x for x in orders if x.startswith(rcvr)][0]
+ except IndexError:
+ # No orders found
+ continue
+
+ # Updating order to include rcvr full starting position (with coasts)
+ orders[unit] = ' '.join([order[0], rcvr] + word[3:]).strip()
+
+ # Checking if coast is specified in destination position
+ if '-' in order:
+ # his -> '- dest/coast'
+ # updating order if coast is specified in his dest, but not ours
+ his = ' '.join(orders.get(rcvr, '').split()[-2:])
+ if his[0] == '-' and his.split('/')[0] == ' '.join(word[3:]):
+ orders[unit] = order[:2] + rcvr + ' ' + his
+
+ # Updating game.orders object
+ self.orders[unit] = orders[unit]
+
+ def _validate_status(self, reinit_powers=True):
+ """ Validates the status of the game object"""
+ # Loading map and setting victory condition
+ if not self.map:
+ self.load_map(reinit_powers=reinit_powers)
+ self.victory = self.map.victory
+
+ # By default, 50% +1 of the scs
+ # Or for victory homes, half the average number of home centers belonging to other powers plus one
+ if not self.victory:
+ self.victory = [len(self.map.scs) // 2 + 1]
+
+ # Ensure game phase was set
+ if not self.phase:
+ self.phase = self.map.phase
+ apart = self.phase.split()
+ if len(apart) == 3:
+ if '%s %s' % (apart[0], apart[2]) not in self.map.seq:
+ self.error += [err.GAME_BAD_PHASE_NOT_IN_FLOW]
+ self.phase_type = apart[2][0]
+ else:
+ self.phase_type = '-'
+
+ # Validate the BEGIN phase (if one was given)
+ if self.phase == 'FORMING':
+ apart = self.map.phase.split()
+ try:
+ int(apart[1])
+ del apart[1]
+ if ' '.join(apart) not in self.map.seq:
+ raise Exception()
+ except ValueError:
+ self.error += [err.GAME_BAD_BEGIN_PHASE]
+
+ # Set victory condition
+ if self.phase not in ('FORMING', 'COMPLETED'):
+ try:
+ year = abs(int(self.phase.split()[1]) - self.map.first_year)
+ win = self.victory[:]
+ self.win = win[min(year, len(win) - 1)]
+ except ValueError:
+ self.error += [err.GAME_BAD_YEAR_GAME_PHASE]
+
+ # Initialize power data
+ for power in self.powers.values():
+
+ # Initialize homes if needed
+ if power.homes is None:
+ power.homes = []
+ for home in self.map.homes.get(power.name, []):
+ self.update_hash(power.name, loc=home, is_home=True)
+ power.homes.append(home)
+
+ # ====================================================================
+ # Private Interface - Generic methods
+ # ====================================================================
+ def _load_rules(self):
+ """ Loads the list of rules and their forced (+) and denied (!) corresponding rules
+ :return: A tuple of dictionaries: rules, forced, and denied
+ rules = {'NO_CHECK':
+ { 'group': '3 Movement Order',
+ 'variant': 'standard',
+ '!': ['RULE_1', 'RULE_2'],
+ '+': ['RULE_3'] } }
+ forced = {'payola': 'RULE_4'}
+ denied = {'payola': 'RULE_5'}
+ """
+ if self.__class__.rule_cache:
+ return self.__class__.rule_cache
+ group = variant = ''
+ data, forced, denied = {}, {}, {}
+ file_path = os.path.join(settings.PACKAGE_DIR, 'README_RULES.txt')
+
+ if not os.path.exists(file_path):
+ self.error.append(err.GAME_UNABLE_TO_FIND_RULES)
+ return data, forced, denied
+
+ with open(file_path, 'r', encoding='utf-8') as file:
+ for line in file:
+ word = line.strip().split()
+
+ # Rules are in the format <!-- RULE NAME !RULE_1 +RULE_2 -->
+ # Where ! indicates a denied rule, and + indicates a forced rule
+ if word[:2] == ['<!--', 'RULE'] and word[-1][-1] == '>':
+
+ # <!-- RULE GROUP 6 Secrecy -->
+ # group would be '6 Secrecy'
+ if word[2] == 'GROUP':
+ group = ' '.join(word[3:-1])
+
+ # <!-- RULE VARIANT standard -->
+ elif word[2] == 'VARIANT':
+ variant = word[3]
+ forced[variant] = [x[1:] for x in word[4:-1] if x[0] == '+']
+ denied[variant] = [x[1:] for x in word[4:-1] if x[0] == '!']
+
+ # <!-- RULE NAME !RULE_1 +RULE_2 -->
+ elif word[2] != 'END':
+ rule = word[2]
+ if rule not in data:
+ data[rule] = {'group': group, 'variant': variant}
+ for control in word[3:-1]:
+ if control[0] in '-=+!':
+ data[rule].setdefault(control[0], []).append(control[1:])
+
+ self.__class__.rule_cache = (data, forced, denied)
+ return data, forced, denied
+
+ def _build_hash_table(self):
+ """ Builds the Zobrist hash tables """
+ if not self.map or self.map_name in self.__class__.zobrist_tables:
+ return
+
+ # Finding powers and locations
+ map_powers = sorted([power_name for power_name in self.map.powers])
+ map_locs = sorted([loc.upper() for loc in self.map.locs if self.map.area_type(loc) != 'SHUT'])
+ nb_powers = len(map_powers)
+ nb_locs = len(map_locs)
+
+ # Generating a standardized seed
+ np_state = np.random.get_state()
+ np.random.seed(12345 + sum([ord(x) * 7 ** ix for ix, x in enumerate(self.map_name)]) % 2 ** 32)
+ self.__class__.zobrist_tables[self.map_name] = {
+ 'unit_type': np.random.randint(1, sys.maxsize, [2, nb_locs]),
+ 'units': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]),
+ 'dis_unit_type': np.random.randint(1, sys.maxsize, [2, nb_locs]),
+ 'dis_units': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]),
+ 'centers': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]),
+ 'homes': np.random.randint(1, sys.maxsize, [nb_powers, nb_locs]),
+ 'map_powers': map_powers,
+ 'map_locs': map_locs
+ }
+ np.random.set_state(np_state)
+
+ # ====================================================================
+ # Private Interface - PROCESSING and phase change methods
+ # ====================================================================
+ def _begin(self):
+ """ Called to begin the game and move to the start phase
+ :return: Nothing
+ """
+ self._move_to_start_phase()
+ self.note = ''
+ self.win = self.victory[0]
+ # Create dummy power objects for non-loaded powers.
+ for power_name in self.map.powers:
+ if power_name not in self.powers:
+ self.powers[power_name] = Power(self, power_name, role=self.role)
+ # Initialize all powers.
+ for starter in self.powers.values():
+ # Starter having type won't be initialized.
+ starter.initialize(self)
+
+ def _process(self):
+ """ Processes the current phase of the game """
+ # Convert all raw movement phase "ORDER"s in a NO_CHECK game to standard orders before calling
+ # Game.process(). All "INVALID" and "REORDER" orders are left raw -- the Game.move_results() method
+ # knows how to detect and report them
+ if 'NO_CHECK' in self.rules and self.phase_type == 'M':
+ for power in self.powers.values():
+ orders, power.orders, civil_disorder = power.orders, {}, power.civil_disorder
+ for status, order in orders.items():
+ if status[:5] != 'ORDER':
+ power.orders[status] = order
+ elif order:
+ self._add_order(power, order.split())
+ power.civil_disorder = civil_disorder
+
+ # Processing the game
+ if self.phase_type == 'M':
+ self._determine_orders()
+ self._add_coasts()
+
+ # Resolving orders
+ self._resolve()
+
+ def _advance_phase(self):
+ """ Advance the game to the next phase (skipping phases with no actions)
+ :return: A list of lines to put in the results
+ """
+
+ # Save results for current phase.
+ # NB: result_history is updated here, neither in process() nor in draw(),
+ # unlike order_history, message_history and state_history.
+ self.result_history.put(self._phase_wrapper_type(self.current_short_phase), self.result)
+ self.result = {}
+
+ # For each possible phase
+ for _ in self.map.seq:
+
+ # If game is not yet started, or completed can't advance
+ if self.phase in (None, 'FORMING', 'COMPLETED'):
+ break
+
+ # Finding next phase and setting variables
+ self.phase = self._find_next_phase()
+ self.phase_type = self.phase.split()[-1][0]
+
+ # Check phase determines if we need to process phase (0) or can skip it (1)
+ if not self._check_phase():
+ break
+ else:
+ raise Exception("FailedToAdvancePhase")
+
+ # Rebuilding the convoy cache
+ self._build_list_possible_convoys()
+
+ # Returning
+ return []
+
+ def _move_to_start_phase(self):
+ """ Moves to the map's start phase
+ :return: Nothing, but sets the self.phase and self.phase_type settings
+ """
+ # Retrieve the beginning phase and phase type from the map
+ self.phase = self.map.phase
+ self.phase_type = self.phase.split()[-1][0]
+
+ def _find_next_phase(self, phase_type=None, skip=0):
+ """ Returns the long name of the phase coming immediately after the current phase
+ :param phase_type: The type of phase we are looking for
+ (e.g. 'M' for Movement, 'R' for Retreats, 'A' for Adjust.)
+ :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after)
+ :return: The long name of the next phase (e.g. FALL 1905 MOVEMENT)
+ """
+ return self.map.find_next_phase(self.phase, phase_type, skip)
+
+ def _find_previous_phase(self, phase_type=None, skip=0):
+ """ Returns the long name of the phase coming immediately before the current phase
+ :param phase_type: The type of phase we are looking for
+ (e.g. 'M' for Movement, 'R' for Retreats, 'A' for Adjust.)
+ :param skip: The number of match to skip (e.g. 1 to find not the next phase, but the one after)
+ :return: The long name of the previous phase (e.g. SPRING 1905 MOVEMENT)
+ """
+ return self.map.find_previous_phase(self.phase, phase_type, skip)
+
+ def _get_start_phase(self):
+ """ Returns the name of the start phase"""
+ cur_phase, cur_phase_type = self.phase, self.phase_type
+ self._move_to_start_phase()
+ phase = self.phase
+ self.phase, self.phase_type = cur_phase, cur_phase_type
+ return phase
+
+ def _check_phase(self):
+ """ Checks if we need to process a phase, or if we can skip it if there are no actions
+ :return: Boolean (0 or 1) - 0 if we need to process phase, 1 if we can skip it
+ """
+ # pylint: disable=too-many-return-statements
+ # Beginning / End of game - Can't skip
+ if self.phase in (None, 'FORMING', 'COMPLETED'):
+ return 0
+
+ # When changing phases, clearing all caches
+ self.clear_cache()
+
+ # Movement phase - Always need to process
+ if self.phase_type == 'M':
+ return 0
+
+ # Retreats phase
+ if self.phase_type == 'R':
+ # We need to process if there are retreats
+ if [1 for x in self.powers.values() if x.retreats]:
+ return 0
+
+ # Otherwise, clearing flags and skipping phase
+ for power in self.powers.values():
+ for dis_unit in power.retreats:
+ self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ power.retreats, power.adjust, power.civil_disorder = {}, [], 0
+ self.result = {}
+ if 'DONT_SKIP_PHASES' in self.rules:
+ return 0
+ return 1
+
+ # Adjustments phase
+ if self.phase_type == 'A':
+ # Capturing supply centers
+ self._capture_centers()
+
+ # If completed, can't skip
+ if self.phase == 'COMPLETED':
+ return 0
+
+ # If we have units to remove or to build, we need to process
+ for power in self.powers.values():
+ units, centers = len(power.units), len(power.centers)
+ if [x for x in power.centers if x in power.homes]:
+ centers += (0 + min(0, len([0 for x in power.units if x[2:5] in power.homes])))
+ if units > centers or (units < centers and self._build_limit(power)):
+ return 0
+
+ # Otherwise, skipping
+ self.result = {}
+ if 'DONT_SKIP_PHASES' in self.rules:
+ return 0
+ return 1
+
+ # Other phases. We need to process manually.
+ return 0
+
+ def _post_move_update(self):
+ """ Deletes orders and removes CD flag after moves """
+ for power in self.powers.values():
+ power.orders, power.civil_disorder = {}, 0
+
+ def _build_sites(self, power):
+ """ Returns a list of sites where power can build units
+ :param power: The power instance to check
+ :return: A list of build sites
+ """
+
+ # Retrieving the list of homes (build sites) for the power, and the list of active powers
+ homes = power.homes
+
+ # Can build on any of his centers
+ # -- BUILD_ANY: Powers may build new units at any owned supply center, not simply at their home supply centers.
+ if 'BUILD_ANY' in self.rules:
+ homes = power.centers
+
+ # Updating homes to only include homes if they are unoccupied,
+ homes = [h for h in homes if h in power.centers and h not in
+ [u[2:5] for p in self.powers.values() for u in p.units]]
+ return homes
+
+ def _build_limit(self, power, sites=None):
+ """ Determines the maximum number of builds a power can do in an adjustment phase
+ Note: This function assumes that one unit can be built per build sites + alternative sites
+ The actual maximum build limit would be less if units are built on alternative sites.
+ :param power: The power instance to check
+ :param sites: The power's build sites (or None to compute them)
+ :return: An integer representing the maximum number of simultaneous builds
+ """
+ # Computing build_sites if not provided
+ if sites is None:
+ sites = self._build_sites(power)
+
+ # Returning number of sites
+ return len(sites)
+
+ def _calculate_victory_score(self):
+ """ Calculates the score to determine win for each power
+ :return: A dict containing the score for each power (e.g. {'FRANCE': 10, 'ENGLAND': 2})
+ """
+ score = {}
+
+ # Score is the number of supply centers owned
+ for power in self.powers.values():
+ score[power] = len([sc for sc in power.centers])
+ return score
+
+ def _determine_win(self, last_year):
+ """ Determine if we have a win.
+ :param last_year: A dict containing the score for each power (e.g. {'FRANCE': 10, 'ENGLAND': 2})
+ (from the previous year)
+ :return: Nothing
+ """
+ victors, this_year = [], self._calculate_victory_score()
+ year_centers = [this_year[x] for x in self.powers.values()]
+
+ # Determining win
+ for power in self.powers.values():
+ centers = this_year[power]
+
+ # 1) you must have enough centers to win
+ if (centers >= self.win
+
+ # 2) and you must grow or, if "HOLD_WIN", must have had a win
+ and (centers > last_year[power], last_year[power] >= self.win)['HOLD_WIN' in self.rules]
+
+ # 3) and you must be alone in the lead (not required in case of SHARED_VICTORY)
+ and ('SHARED_VICTORY' in self.rules
+ or (centers, year_centers.count(centers)) == (max(year_centers), 1))):
+ victors += [power]
+
+ # We have a winner!
+ if victors:
+ self._finish([victor.name for victor in victors])
+
+ # DRAW if 100 years
+ elif int(self.phase.split()[1]) - self.map.first_year + 1 == 100:
+ self.draw()
+
+ def _capture_centers(self):
+ """ In Adjustment Phase, proceed with the capture of occupied supply centers
+ :return: Nothing
+ """
+ victory_score_prev_year = self._calculate_victory_score()
+
+ # If no power owns centers, initialize them
+ if not [1 for x in self.powers.values() if x.centers]:
+ for power in self.powers.values():
+ for center in power.centers:
+ self.update_hash(power.name, loc=center, is_center=True)
+ power.centers = []
+ for center in power.homes:
+ self.update_hash(power.name, loc=center, is_center=True)
+ power.centers.append(center)
+
+ # Remember the current center count for the various powers, for use in victory condition check,
+ # then go through and see if any centers have been taken over
+ unowned = self.map.scs[:]
+ for power in self.powers.values():
+ for center in power.centers:
+ if center in unowned:
+ unowned.remove(center)
+
+ # Keep track of scs lost
+ self.lost = {}
+ for power in list(self.powers.values()) + [None]:
+ # Centers before takover
+ if power:
+ centers = power.centers
+ else:
+ centers = unowned
+
+ # For each center, check if we took ownership
+ for center in centers[:]:
+ for owner in self.powers.values():
+
+ # 1) If center is unowned, or 2) owned by someone else and that we have a unit on it
+ # Proceed with transfer, and record lost
+ if (not power or owner is not power) and center in [x[2:5] for x in owner.units]:
+ self._transfer_center(power, owner, center)
+ if not power:
+ unowned.remove(center)
+ else:
+ self.lost[center] = power
+ break
+
+ # Determining if we have a winner
+ self._determine_win(victory_score_prev_year)
+
+ def _transfer_center(self, from_power, to_power, center):
+ """ Transfers a supply center from a power to another
+ :param from_power: The power instance from whom the supply center is transfered
+ :param to_power: The power instance to whom the supply center is transferred
+ :param center: The supply center location (e.g. 'PAR')
+ :return: Nothing
+ """
+ if from_power:
+ self.update_hash(from_power.name, loc=center, is_center=True)
+ from_power.centers.remove(center)
+ if center not in to_power.centers:
+ self.update_hash(to_power.name, loc=center, is_center=True)
+ to_power.centers += [center]
+
+ def _finish(self, victors):
+ """ Indicates that a game is finished and has been won by 'victors'
+ :param victors: The list of victors (e.g. ['FRANCE', 'GERMANY'])
+ :return: Nothing
+ """
+ # Setting outcome, and end date. Clearing orders and saving.
+ self.outcome = [self._phase_abbr()] + victors
+ self.note = 'Victory by: ' + ', '.join([vic[:3] for vic in victors])
+ self.phase = 'COMPLETED'
+ self.set_status(strings.COMPLETED)
+ for power in self.powers.values():
+ for dis_unit in power.retreats:
+ self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ power.retreats, power.adjust, power.civil_disorder = {}, [], 0
+
+ def _phase_abbr(self, phase=None):
+ """ Constructs a 5 character representation (S1901M) from a phase (SPRING 1901 MOVEMENT)
+ :param phase: The full phase (e.g. SPRING 1901 MOVEMENT)
+ :return: A 5 character representation of the phase
+ """
+ return self.map.phase_abbr(phase or self.phase)
+
+ # ====================================================================
+ # Private Interface - ORDER Submission methods
+ # ====================================================================
+ def _add_order(self, power, word, expand=True, replace=True):
+ """ Adds an order for a power
+ :param power: The power instance issuing the order
+ :param word: The order (e.g. ['A', 'PAR', '-', 'MAR'])
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing, but adds error to self.error
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ if not word:
+ return None
+
+ raw_word = word
+
+ if expand:
+ # Check that the order is valid. If not, self.error will say why.
+ word = self._expand_order(word)
+ word = self._expand_coast(word)
+ word = self._add_unit_types(word)
+ word = self.map.default_coast(word)
+
+ # Last word is '?' - Removing it
+ if word and len(word[-1]) == 1 and not word[-1].isalpha():
+ word = word[:-1]
+ if len(word) < 2:
+ return self.error.append(err.STD_GAME_BAD_ORDER % ' '.join(word))
+
+ # Checking if we can order unit
+ unit, order = ' '.join(word[:2]), ' '.join(word[2:])
+ owner = self._unit_owner(unit)
+ if not owner or owner is not power:
+ self.error += [err.STD_GAME_UNORDERABLE_UNIT % ' '.join(word)]
+
+ # Validating order
+ elif order:
+ valid = self._valid_order(power, unit, order)
+
+ # Valid order. But is it to a unit already ordered? This is okay in a NO_CHECK game, and
+ # we HOLD the unit. If not, pack it back into the power's order list.
+ if valid is not None:
+ power.civil_disorder = 0
+ if valid == -1:
+ order += ' ?'
+ if unit not in power.orders or (replace and 'NO_CHECK' not in self.rules):
+ power.orders[unit] = order
+ elif 'NO_CHECK' in self.rules:
+ count = len(power.orders)
+ if power.orders[unit] not in ('H', order):
+ power.orders['REORDER %d' % count] = power.orders[unit]
+ count += 1
+ power.orders[unit] = 'H'
+ power.orders['REORDER %d' % count] = ' '.join(word)
+ else:
+ self.error += [err.STD_GAME_UNIT_REORDERED % unit]
+
+ # Invalid order in NO_CHECK game
+ elif 'NO_CHECK' in self.rules:
+ count = len(power.orders)
+ power.orders['INVALID %d' % count] = ' '.join(raw_word)
+
+ # Returning nothing
+ return None
+
+ def _update_orders(self, power, orders, expand=True, replace=True):
+ """ Updates the orders of a power
+ :param power: The power instance (or None if updating multiple instances)
+ :param orders: The updated list of orders
+ e.g. ['A MAR - PAR', 'A PAR - BER', ...]
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ cur_power, had_orders, has_orders, powers = power, [], [], []
+
+ # For each order
+ for line in orders:
+
+ word = line.strip().split()
+ who = cur_power
+
+ if not word:
+ continue
+
+ # Checking if the power can order
+ if not hasattr(who, 'orders'):
+ return self.error.append('%s HAS NO UNITS OF ITS OWN TO ORDER' % who.name)
+
+ # NMR = No Moves Received (NMR or CLEAR command)
+ nmr = (len(word) == 1
+ and word[0][word[0][:1] in '([':len(word[0]) - (word[0][-1:] in '])')].upper() in ('NMR', 'CLEAR'))
+ if who not in powers:
+
+ # Empty orders before sticking any new orders in it.
+ had_orders += [who.orders]
+ powers += [who]
+ if nmr:
+ continue
+
+ # If CLEAR or NMR, clear orders
+ elif nmr:
+ who.orders = {}
+ has_orders = [x for x in has_orders if x is not who]
+ continue
+
+ # Adds orders
+ if 'NO_CHECK' in self.rules:
+ data = self._expand_order(word)
+ if len(data) < 3 and (len(data) == 1 or data[1] != 'H'):
+ self.error.append(err.STD_GAME_BAD_ORDER % line.upper())
+ continue
+
+ # Voiding previous order on same unit
+ if replace:
+ for order in who.orders:
+ order_parts = who.orders[order].split()
+ if len(order_parts) >= 2 and order_parts[1][:3] == word[1][:3]:
+ who.orders[order] = ''
+
+ # Adding new order
+ who.orders['ORDER %d' % (len(who.orders) + 1)] = ' '.join(word)
+ else:
+ self._add_order(who, word, expand=expand, replace=replace)
+ if who.orders and who not in has_orders:
+ has_orders += [who]
+
+ # Make sure the player can update his orders
+ if not powers:
+ return 1
+ if self.error:
+ return self.error
+
+ # Clear CD flag, even if orders were cleared
+ for who in powers:
+ who.civil_disorder = 0
+
+ # Returning nothing
+ return None
+
+ def _add_retreat_orders(self, power, orders, expand=True, replace=True):
+ """ Adds a retreat order (Retreats Phase)
+ :param power: The power instance who is submitting orders (or None if power is in the orders)
+ :param orders: The list of adjustment orders
+ (format can be [Country: order], [Country, order, order], or [order,order])
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing, but adds error to self.error
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ # No orders, returning
+ if not orders:
+ power.adjust, power.civil_disorder = [], 0
+ return
+
+ # Processing each order
+ adjust, retreated = [], []
+ for order in orders:
+ word = order.split()
+ if not word or len(word) < 2:
+ continue
+
+ # Expanding and adding unit types
+ if expand:
+ word = self._expand_order([order])
+ word = self._add_unit_types(word)
+
+ # Add 'R' has order type for Retreat, 'D' for Disband
+ if word[0] == 'R' and len(word) > 3:
+ del word[0]
+ if word[0] in 'RD':
+ word = word[1:] + word[:1]
+
+ # Checking if unit can retreat
+ unit = ' '.join(word[:2])
+ try:
+ unit = [r_unit for r_unit in power.retreats if r_unit == unit or r_unit.startswith(unit + '/')][0]
+ except IndexError:
+ adjust += ['VOID ' + order]
+ self.error.append(err.GAME_UNIT_NOT_IN_RETREAT % unit)
+ continue
+
+ # Checking if unit already retreated
+ if unit in retreated:
+ adjust += ['VOID ' + order]
+ self.error.append(err.GAME_TWO_ORDERS_FOR_RETREATING_UNIT % unit)
+ continue
+ word[1] = unit[2:]
+
+ # Adding Disband for retreats with no destination
+ if len(word) == 3 and word[2] in 'RD':
+ word[2] = 'D'
+
+ # Checking if retreat destination is valid
+ elif len(word) == 4 and word[2] in 'R-':
+ word[2] = 'R'
+ if word[3] not in power.retreats[unit]\
+ or self._unit_owner('A {}'.format(word[3][:3]), coast_required=0) \
+ or self._unit_owner('F {}'.format(word[3][:3]), coast_required=0):
+ self.error.append(err.GAME_INVALID_RETREAT_DEST % ' '.join(word))
+ adjust += ['VOID ' + order]
+ continue
+
+ # Invalid retreat order - Voiding
+ else:
+ self.error.append(err.GAME_BAD_RETREAT_ORDER % ' '.join(word))
+ adjust += ['VOID ' + order]
+ continue
+
+ # Adding retreat order and marking unit as retreated
+ retreated += [unit]
+ adjust += [' '.join(word)]
+
+ # Replacing previous orders
+ if replace:
+ for order in adjust:
+ word = order.split()
+ if len(word) >= 2 and word[0] != 'VOID':
+ power.adjust = [adj_order for adj_order in power.adjust if adj_order.split()[1:2] != word[1:2]]
+
+ # Otherwise, marking re-orders as invalid
+ else:
+ ordered_locs = [adj_order.split()[1] for adj_order in power.adjust]
+ for order in adjust[:]:
+ word = order.split()
+ if len(word) >= 2 and word[1] in ordered_locs:
+ self.error += [err.GAME_MULTIPLE_ORDERS_FOR_UNIT % ' '.join(word[:2])]
+ adjust.remove(order)
+
+ # Finalizing orders
+ power.adjust += adjust
+ power.civil_disorder = 0
+
+ def _update_retreat_orders(self, power, orders, expand=True, replace=True):
+ """ Updates order for Retreats phase
+ :param power: The power instance submitting the orders
+ :param orders: The updated orders
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: List of processing errors
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ for who, adj in self._distribute_orders(power, orders):
+ self._add_retreat_orders(who, adj, expand=expand, replace=replace)
+ return self.error
+
+ def _add_adjust_orders(self, power, orders, expand=True, replace=True):
+ """ Adds an adjustment order (Adjustment Phase)
+ :param power: The power instance who is submitting orders (or None if power is in the orders)
+ :param orders: The list of adjustment orders (format can be [Country: order],
+ [Country, order, order], or [order,order])
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: Nothing, but adds error to self.error
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ # pylint: disable=too-many-branches
+ # No orders submitted, returning
+ if not orders:
+ power.adjust, power.civil_disorder = [], 0
+ return
+
+ # Calculating if the power can build or remove units
+ adjust, places = [], []
+ need, sites = len(power.centers) - len(power.units), []
+ order_type = 'D' if need < 0 else 'B'
+
+ # If we can build, calculating list of possible build locations
+ if need > 0:
+ sites = self._build_sites(power)
+ need = min(need, self._build_limit(power, sites))
+
+ # Processing each order
+ for order in orders:
+ order = order.strip()
+
+ if order == 'WAIVE':
+ # Check WAIVE order immediately and continue to next loop step.
+ if need >= 0:
+ adjust += [order]
+ else:
+ adjust += ['VOID ' + order]
+ self.error += ['WAIVE NOT ALLOWED FOR DISBAND']
+ continue
+
+ if not order or len(order.split()) < 2:
+ continue
+ word = self._expand_order([order]) if expand else order.split()
+
+ # Checking if unit can Build/Disband, otherwise voiding order
+ if word[-1] == order_type:
+ pass
+ elif word[-1] in 'BD':
+ adjust += ['VOID ' + order]
+ self.error += ['ORDER NOT ALLOWED: ' + order]
+ continue
+
+ # Adding unit type
+ if word[-1] == 'D' and expand:
+ word = self._add_unit_types(word)
+
+ # Checking for 'Disband'
+ order = ' '.join(word)
+ if word[-1] == 'D':
+ if len(word) == 3:
+ unit = ' '.join(word[:2])
+
+ # Invalid unit, voiding order
+ if unit not in power.units:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_NO_SUCH_UNIT % unit]
+
+ # Order to remove unit
+ elif order not in adjust:
+ adjust += [order]
+
+ # Invalid order, voiding
+ else:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_MULTIPLE_ORDERS_FOR_UNIT % unit]
+ else:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_BAD_ADJUSTMENT_ORDER % order]
+
+ # Checking for BUILD
+ elif len(word) == 3:
+ site = word[1][:3]
+
+ # Invalid build site
+ if site not in sites:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_INVALID_BUILD_SITE % order]
+
+ # Site already used
+ elif site in places:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_MULT_BUILDS_IN_SITE % order]
+
+ # Unit can't be built there
+ elif not self.map.is_valid_unit(' '.join(word[:2])):
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_INVALID_BUILD_ORDER % order]
+
+ # Valid build sites
+ else:
+ adjust += [order]
+ places += [site]
+
+ # Otherwise, unknown order - Voiding
+ else:
+ adjust += ['VOID ' + order]
+ self.error += [err.GAME_BAD_ADJUSTMENT_ORDER % order]
+
+ # NB: We skip WAIVE orders when checking for replacements.
+ # We will check them later.
+
+ # Replacing previous orders
+ if replace:
+ for order in adjust:
+ word = order.split()
+ if len(word) >= 2 and word[0] != 'VOID':
+ power.adjust = [adj_order for adj_order in power.adjust
+ if adj_order == 'WAIVE' or adj_order.split()[1] != word[1]]
+
+ # Otherwise, marking re-orders as invalid
+ else:
+ ordered_locs = [adj_order.split()[1] for adj_order in power.adjust if adj_order != 'WAIVE']
+ for order in adjust[:]:
+ word = order.split()
+ if len(word) >= 2 and word[1] in ordered_locs:
+ self.error += [err.GAME_MULTIPLE_ORDERS_FOR_UNIT % ' '.join(word[:2])]
+ adjust.remove(order)
+
+ # Finalizing orders
+ power.adjust += adjust
+ power.civil_disorder = 0
+
+ # We check WAIVE orders in power.adjust after updating power.adjust,
+ # as WAIVE orders depend on variable `need`, whom computation is relative to power
+ # (ie. not relative to orders being currently adjusted).
+
+ # Removing extra waive orders
+ while 0 < need < len(power.adjust):
+ if 'WAIVE' in power.adjust:
+ power.adjust.remove('WAIVE')
+ else:
+ break
+
+ # Adding missing waive orders
+ if 'WAIVE' in power.adjust or power.is_dummy():
+ power.adjust.extend(['WAIVE'] * (need - len(power.adjust)))
+
+ def _update_adjust_orders(self, power, orders, expand=True, replace=True):
+ """ Updates order for Adjustment phase
+ :param power: The power instance submitting the orders
+ :param orders: The updated orders
+ :param expand: Boolean. If set, performs order expansion and reformatting (e.g. adding unit type, etc.)
+ If false, expect orders in the following format. False gives a performance improvement.
+ :param replace: Boolean. If set, replace previous orders on same units, otherwise prevents re-orders.
+ :return: List of processing errors
+
+ Expected format:
+ A LON H, F IRI - MAO, A IRI - MAO VIA, A WAL S F LON, A WAL S F MAO - IRI, F NWG C A NWY - EDI
+ A IRO R MAO, A IRO D, A LON B, F LIV B
+ """
+ for who, adj in self._distribute_orders(power, orders):
+ self._add_adjust_orders(who, adj, expand=expand, replace=replace)
+ return self.error
+
+ def _determine_orders(self):
+ """ Builds the self.orders dictionary (i.e. makes sure all orders are legitimate). """
+ self.orders = {}
+
+ # Determine the orders to be issued to each unit, based on unit ownership
+ for power in self.powers.values():
+ for unit, order in power.orders.items():
+ if power is self._unit_owner(unit):
+ self.orders[unit] = order
+
+ # In NO_CHECK games, ensure that orders to other player's units are reported as invalid
+ # if no proxy was given
+ if 'NO_CHECK' in self.rules:
+ for power in self.powers.values():
+ for unit, order in power.orders.items():
+ if unit[0] not in 'RI' and power is not self._unit_owner(unit):
+ order = unit + ' ' + order
+ power.orders['INVALID %d' % len(power.orders)] = order
+
+ def _default_orders(self, power):
+ """ Issues default orders for a power (HOLD)
+ :param power: The power instance
+ :return: Nothing
+ """
+ # Power has no units
+ if not power.units:
+ return
+
+ # Power has not submitted all his orders, checking if we default to HOLD
+ if not [x for x in power.units if self.orders.get(x)]:
+ power.civil_disorder = 1
+ for unit in power.units:
+ self.orders.setdefault(unit, 'H')
+
+ @classmethod
+ def _distribute_orders(cls, power, orders, clear=True):
+ """ For controlling powers, distribute orders to controlled powers
+ :param power: The power instance submitting the orders
+ :param orders: The list of orders submitted
+ :param clear: Boolean flag to indicate to clear order if NMR/CLEAR submitted
+ :return: A list of tuple with (controlled power instance, list of orders for that controlled power instance)
+ """
+ powers, distributor = [power], {power.name: []}
+ cur_power = power
+
+ # For each order submitted
+ for order in orders:
+ # Don't distribute blank order
+ word = order.strip().split()
+ if not word:
+ continue
+
+ who = cur_power
+
+ # Clearing orders for power
+ if (clear
+ and len(word) == 1
+ and word[0][word[0][:1] in '([':len(word[0]) - (word[0][-1:] in '])')].upper() in ('NMR', 'CLEAR')):
+ distributor[who.name] = []
+ # Otherwise, distributing order
+ else:
+ distributor[who.name] += [' '.join(word)]
+
+ # Returning a list of tuples with the power instance and their respective orders.
+ return [(x, distributor[x.name]) for x in powers]
+
+ # ====================================================================
+ # Private Interface - ADJUDICATION Methods
+ # ====================================================================
+ def _abuts(self, unit_type, unit_loc, order_type, other_loc):
+ """ Determines if a order for unit_type from unit_loc to other_loc is adjacent (Support and convoy only)
+ :param unit_type: The type of unit ('A' or 'F')
+ :param unit_loc: The location of the unit ('BUR', 'BUL/EC')
+ :param order_type: The type of order ('S' for Support, 'C' for Convoy', '-' for move)
+ :param other_loc: The location of the other unit
+ :return: 1 if the locations are adjacent for the move, 0 otherwise
+ """
+ # Check if the map says the adjacency is good
+ if not self.map.abuts(unit_type, unit_loc, order_type, other_loc):
+ return 0
+ return 1
+
+ def _unit_owner(self, unit, coast_required=1):
+ """ Finds the power who owns a unit
+ :param unit: The name of the unit to find (e.g. 'A PAR')
+ :param coast_required: Indicates that the coast is in the unit
+ (if 0, you can search for 'F STP' for example, but if 1, you must specify 'F STP/SC')
+ :return: The power instance who owns the unit or None
+ """
+ # If coast_required is 0 and unit does not contain a '/'
+ # return the owner if we find a unit that starts with unit
+ # Don't count the unit if it needs to retreat (i.e. it has been dislodged)
+ for owner in self.powers.values():
+ if unit in owner.units:
+ return owner
+ if not coast_required and '/' not in unit and [1 for x in owner.units if x.find(unit) == 0]:
+ return owner
+ return None
+
+ def _occupant(self, site, any_coast=0):
+ """ Finds the occupant of a site
+ :param site: The site name (e.g. "STP")
+ :param any_coast: Boolean to indicate to return unit on any coast
+ :return: The unit (e.g. "A STP", "F STP/NC") occupying the site, None otherwise
+ """
+ if any_coast:
+ site = site[:3]
+ for power in self.powers.values():
+ for unit in power.units:
+ if unit[2:].startswith(site):
+ return unit
+ return None
+
+ def _strengths(self):
+ """ This function sets self.combat to a dictionary of dictionaries, specifying each potential destination
+ for every piece, with the strengths of each unit's attempt to get (or stay) there, and with the givers
+ of supports that DON'T country dislodgement. (i.e. supports given by the power owning the occupying unit).
+ :return: Nothing, but sets self.combat
+ """
+ # For example, the following orders, all by the same power:
+ # A MUN H, A SIL - MUN, A BOH S A SIL - MUN, A RUH - MUN would result in:
+ # e.g. { 'MUN': { 1 : [ ['A MUN', [] ], ['A RUH', [] ] ], 2 : [ ['A SIL', ['A BOH'] ] ] } }
+ # MUN is holding, being attack without support from RUH and being attacked with support from SIL (S from BOH)
+ self.combat = {}
+
+ # For each order
+ for unit, order in self.command.items():
+ word = order.split()
+
+ # Strength of a non-move or failed move is 1 + support
+ if word[0] != '-' or self.result[unit]:
+ place, strength = unit[2:5], 1
+
+ # Strength of move depends on * and ~ in adjacency list
+ else:
+ offset = 1 if word[-1] == 'VIA' else 0
+ place = word[-1 - offset][:3]
+ strength = 1
+
+ # Adds the list of supporting units
+ # Only adding the support that DOES NOT count toward dislodgment
+ self.combat \
+ .setdefault(place, {}) \
+ .setdefault(strength + self.supports[unit][0], []) \
+ .append([unit, self.supports[unit][1]])
+
+ def _detect_paradox(self, starting_node, paradox_action, paradox_last_words):
+ """ Paradox detection algorithm. Start at starting node and move chain to see if node if performing
+ paradox action
+ :param starting_node: The location (e.g. PAR) where to start the paradox chain
+ :param paradox_action: The action that would cause a paradox in the chain (e.g. 'S')
+ :param paradox_last_words: The last words to detect in a order to cause a paradox (e.g. ['F', 'NTH'])
+ :return: Boolean (1 or 0) to indicate if a paradox action was detected in the chain
+ """
+ visited_units = []
+ current_node = starting_node
+ current_unit = self._occupant(current_node)
+ while current_unit is not None and current_unit not in visited_units:
+ visited_units += [current_unit]
+ current_order = self.command.get(current_unit, 'H')
+
+ # Action and last words detected
+ if (current_order[0] == paradox_action
+ and current_order.split()[-1 * len(paradox_last_words):] == paradox_last_words):
+ return True
+
+ # Continuing chain only if order is Support or Convoy
+ if current_order.split()[0] not in 'SC':
+ break
+ current_node = current_order.split()[-1]
+ current_unit = self._occupant(current_node)
+
+ # No paradox detected
+ return False
+
+ def _check_disruptions(self, may_convoy, result, coresult=None):
+ """ Determines convoy disruptions.
+ :param may_convoy: Contains the dictionary of all convoys that have a chance to succeed
+ (e.g. {'A PAR': ['BER', 'MUN']}
+ :param result: Result to set for the unit if the convoying fleet would be dislodged
+ (e.g. 'maybe', 'no convoy')
+ :param coresult: Result to set for the convoyer if the convoying fleet would be dislodged (e.g. 'dislodged')
+ :return: Nothing
+ """
+ for unit, word in may_convoy.items():
+
+ # Removing '-'
+ word = [w for w in word if w != '-']
+
+ # Checking order of unit at dest
+ offset = 1 if self.command.get(unit, []).split()[-1] == 'VIA' else 0
+ convoy_dest = self.command.get(unit, 'H').split()[-1 - offset]
+ unit_at_dest = self._occupant(convoy_dest)
+ order_unit_at_dest = self.command.get(unit_at_dest, 'H')
+
+ # Looping over all areas where convoys will take place (including destination)
+ for place in word:
+ area, convoyer = place[:3], 'AF'[unit[0] == 'A'] + ' ' + place
+ strongest = self.combat[area][max(self.combat[area])]
+
+ # Checking if the convoy is under attack
+ for strong_unit in strongest:
+ if self._unit_owner(convoyer) != self._unit_owner(strong_unit[0]):
+ break
+ else:
+ continue
+
+ # Paradox Detection #1
+ # [1st and 2nd order] Checking that we are not attacking a chain, with the last unit supporting
+ # the convoy
+ paradox = self._detect_paradox(convoy_dest, 'S', ['S', 'F', area])
+
+ # Checking if the convoy can withstand the attack and there is not active paradox
+ if convoyer in [x[0] for x in strongest] and not paradox:
+ continue
+
+ # For a beleaguered garrison, checking if the destination is attacking / supporting an attack
+ # against convoy
+ if len(strongest) >= 2 and not paradox:
+ if order_unit_at_dest.split()[0] not in '-S' or order_unit_at_dest.split()[-1][:3] != area:
+ continue
+
+ # Removing paths using place
+ self.convoy_paths.setdefault(unit, [])
+ for path in self.convoy_paths[unit]:
+ if place in path:
+ self.convoy_paths[unit].remove(path)
+
+ # Paradox Detection #2 - Can convoyed unit use land route to cut support necessary to attack convoy
+ paradox = False
+ if self._abuts(unit[0], unit[2:], '-', convoy_dest):
+ paradox = self._detect_paradox(convoy_dest, 'S', ['-', area])
+
+ # Setting the result if there is no convoy paths left, and
+ # 1) there is no land route (or there is a paradox through the land route)
+ # or 2) the unit specified 'VIA' and doesn't want to try the land route (4.A.3)
+ if not self.convoy_paths[unit] and (paradox
+ or not self._abuts(unit[0], unit[2:], '-', convoy_dest)
+ or (self._abuts(unit[0], unit[2:], '-', convoy_dest)
+ and self.command[unit].split()[-1] == 'VIA')):
+ self.result[unit] = [result]
+
+ # Setting the result for a would-be dislodged fleet
+ if coresult:
+ self.result[convoyer] = [coresult]
+
+ def _boing(self, unit):
+ """ Mark a unit bounced, and update the combat table to show the unit as
+ having strength one at its current location
+ :param unit: The unit to bounce (e.g. 'A PAR')
+ :return: 1
+ """
+ self.result[unit] += ['bounce']
+ self.combat \
+ .setdefault(unit[2:5], {}) \
+ .setdefault(1, []) \
+ .append([unit, []])
+ return 1
+
+ def _bounce(self):
+ """ This methods marks all units that can't get where they're going as bounced.
+ It loops to handle bounce-chains.
+ """
+ # pylint: disable=too-many-nested-blocks
+ bounced = 1
+ while bounced:
+ bounced = 0
+
+ # STEP 6. MARK (non-convoyed) PLACE-SWAP BOUNCERS
+ for unit, order in self.command.items():
+ word = order.split()
+ if self.result[unit] or word[0] != '-' or self._is_moving_via_convoy(unit):
+ continue
+ crawl_ok, site = False, '- ' + unit[2:]
+ swap = self._occupant(word[1], any_coast=not crawl_ok)
+ if self._is_moving_via_convoy(swap):
+ continue
+ if not (crawl_ok and swap and swap[0] == unit[0] == 'F'):
+ site = site.split('/')[0]
+ if not (self.command.get(swap, '').find(site) or self.result[swap]):
+ my_strength = self.supports[unit][0] - len(self.supports[unit][1])
+ his_strength = self.supports[swap][0] - len(self.supports[swap][1])
+ our_strength = (self._unit_owner(unit) is self._unit_owner(swap)
+ or self.supports[unit][0] == self.supports[swap][0])
+ if our_strength or my_strength <= his_strength:
+ self._boing(unit)
+ if our_strength or his_strength <= my_strength:
+ self._boing(swap)
+
+ # Marking support used for self-dislodgement as void
+ for supporting_unit in self.supports[unit][1]:
+ self.result[supporting_unit] += ['void']
+ for supporting_unit in self.supports[swap][1]:
+ self.result[supporting_unit] += ['void']
+ bounced = 1
+ if bounced:
+ continue
+ # No (more) swap-bouncers
+
+ # STEP 7. MARK OUTGUNNED BOUNCERS
+ for place, conflicts in list(self.combat.items()):
+ strength = sorted(conflicts.keys())
+ for key in strength:
+ if key != strength[-1] or len(conflicts[key]) != 1:
+ for unit, no_help in conflicts[key]:
+ if not self.result[unit] and self.command[unit][0] == '-':
+ bounced = self._boing(unit)
+ if bounced:
+ continue
+ # No (more) outgunned bouncers
+
+ # STEP 8. MARK SELF-DISLODGE BOUNCERS
+ for place, conflicts in list(self.combat.items()):
+ strength = sorted(conflicts.keys())
+ if len(conflicts[strength[-1]]) != 1:
+ continue
+ strongest = conflicts[strength[-1]][0][0]
+ if self.command[strongest][0] != '-' or self.result[strongest]:
+ continue
+ no_help = len(conflicts[strength[-1]][0][1])
+ guy = self._occupant(place)
+ if guy:
+ owner = self._unit_owner(guy)
+ if ((self.command[guy][0] != '-' or self.result[guy])
+ and (owner is self._unit_owner(strongest)
+ or (len(strength) > 1 and strength[-1] - no_help <= strength[-2]))):
+ bounced = self._boing(strongest)
+ for supporting_unit in conflicts[strength[-1]][0][1]:
+ if 'void' not in self.result[supporting_unit]:
+ self.result[supporting_unit] += ['void']
+
+ # No (more) self-dislodge bouncers
+
+ def _cut_support(self, unit, direct=0):
+ """ See if the order made by the unit cuts a support. If so, cut it.
+ :param unit: The unit who is attacking (and cutting support)
+ :param direct: Boolean Flag - If set, the order must not only be a move, but also a non-convoyed move.
+ :return: Nothing
+ """
+ order = self.command[unit]
+ word = order.split()
+ if word[0] != '-' or (direct and self._is_moving_via_convoy(unit)):
+ return
+ dest = word[-1] if word[-1] != 'VIA' else word[-2]
+ other_unit = self._occupant(dest, any_coast=1)
+ coord = self.command.get(other_unit, 'no unit at dest').split()
+ support_target = 'F ' + coord[-1][:3]
+
+ # pylint: disable=too-many-boolean-expressions
+ if (coord[0] == 'S'
+ and 'cut' not in self.result[other_unit]
+ and 'void' not in self.result[other_unit]
+
+ # EXCEPTION A: CANNOT CUT SUPPORT YOU YOURSELF ARE GIVING
+ and (self._unit_owner(unit) is not self._unit_owner(other_unit))
+
+ # EXCEPTION B: CANNOT CUT SUPPORT FOR A MOVE AGAINST YOUR LOCATION
+ and coord[-1][:3] != unit[2:5]
+
+ # EXCEPTION C: OR (IF CONVOYED) FOR OR AGAINST ANY CONVOYING FLEET
+ and (not self._is_moving_via_convoy(unit)
+ or self.command.get(support_target, 'H')[0] != 'C'
+ or 'void' in self.result.get(support_target, [])
+ # EXCEPTION TO EXCEPTION C: IF THERE IS A ALTERNATIVE CONVOY ROUTE
+ or [1 for path in self.convoy_paths[unit] if support_target[2:] not in path])):
+
+ # Okay, the support is cut.
+ self.result[other_unit] += ['cut']
+ affected = ' '.join(coord[1:3]) # Unit being supported
+ self.supports[affected][0] -= 1
+ if other_unit in self.supports[affected][1]:
+ self.supports[affected][1].remove(other_unit)
+
+ def _no_effect(self, unit, site):
+ """ Removes a unit from the combat list of an attack site
+ :param unit: The unit attacking the site (e.g. ['A PAR', []])
+ :param site: The site being attacked (e.g. 'MAR')
+ :return: Nothing
+ """
+ sups = [strength for strength, attack_unit in self.combat[site].items() if unit in attack_unit][0]
+ self.combat[site][sups].remove(unit)
+ if not self.combat[site][sups]:
+ del self.combat[site][sups]
+ if not self.combat[site]:
+ del self.combat[site]
+
+ def _unbounce(self, site):
+ """ Unbounce any powerful-enough move that can now take the spot being vacated by the dislodger.
+ :param site: The site being attacked
+ :return: Nothing
+ """
+ # Detecting if there is only one attack winning at site
+ most = max(self.combat[site])
+ if len(self.combat[site][most]) > 1:
+ return None
+
+ # Unbouncing the winner of the attack at site
+ unbouncer = self.combat[site][most][0][0]
+ if 'bounce' in self.result[unbouncer]:
+ self.result[unbouncer].remove('bounce')
+ if unbouncer in self.dislodged:
+ del self.dislodged[unbouncer]
+ return self.result[unbouncer].remove('dislodged')
+
+ next_site = unbouncer[2:5]
+ self._no_effect([unbouncer, []], next_site)
+ if next_site in self.combat:
+ self._unbounce(next_site)
+ return None
+
+ def _resolve_moves(self):
+ """ Resolves the list of orders """
+ # pylint: disable=too-many-statements,too-many-branches
+
+ # -----------------------------------------------------------
+ # STEP 0: DECLARE ALL RESULTS AS YET UNKNOWN
+ self.result, self.supports, self.convoy_paths, may_convoy = {}, {}, {}, {}
+
+ # Fill self.command from the self.orders dictionary
+ # Fill self.ordered_units from the powers.units list
+ # Default order is to hold
+ self.command = {}
+ self.ordered_units = {}
+ for power in self.powers.values():
+ self.ordered_units[power.name] = [unit for unit in power.units if unit in self.orders]
+ for unit in power.units:
+ self.command[unit] = self.orders.get(unit, 'H')
+ if 'NO_CHECK' in self.rules:
+ for order in [order for key, order in power.orders.items() if key.startswith('INVALID')]:
+ unit = ' '.join(order.split()[:2])
+ self.ordered_units[power.name] += [unit]
+ self.command[unit] = 'H'
+ self.result[unit] = ['void']
+ self._default_orders(power)
+
+ for unit in self.command:
+ self.result.setdefault(unit, [])
+ self.supports.setdefault(unit, [0, []])
+
+ # -----------------------------------------------------------
+ # STEP 1A. CANCEL ALL INVALID ORDERS GIVEN TO UNITS ATTEMPTING TO MOVE BY CONVOY
+ for unit, order in list(self.command.items()):
+ word = order.split()
+ if word[0] != '-':
+ continue
+
+ # Full convoy path has been specified (e.g. 'A PAR - MAR - NAO - MAO - LON')
+ offset = 1 if word[-1] == 'VIA' else 0
+ if len(word) - offset > 2:
+ for convoyer in range(1, len(word) - 1, 2):
+ convoy_order = self.command.get('AF'[unit[0] == 'A'] + ' ' + word[convoyer])
+ if convoy_order not in ['C %s - ' % x + word[-1] for x in (unit, unit[2:])]:
+ if convoy_order:
+ self.result[unit] += ['no convoy']
+ else:
+ self.command[unit] = 'H'
+ break
+ # List the valid convoys
+ else:
+ may_convoy[unit] = order.split()
+ self.convoy_paths[unit] = [[unit[2:]] + word[1::2]]
+
+ # Only src and dest provided
+ else:
+ def flatten(nested_list):
+ """ Flattens a sublist """
+ return [list_item for sublist in nested_list for list_item in sublist]
+
+ has_via_convoy_flag = 1 if word[-1] == 'VIA' else 0
+ convoying_units = self._get_convoying_units_for_path(unit[0], unit[2:], word[1])
+ possible_paths = self._get_convoy_paths(unit[0],
+ unit[2:],
+ word[1],
+ has_via_convoy_flag,
+ convoying_units)
+
+ # No convoy path - Removing VIA and checking if adjacent
+ if not possible_paths:
+ if has_via_convoy_flag:
+ self.command[unit] = ' '.join(word[:-1])
+ if not self._abuts(unit[0], unit[2:], 'S', word[1]):
+ self.result[unit] += ['no convoy']
+
+ # There is a convoy path, remembering the convoyers
+ else:
+ self.convoy_paths[unit] = possible_paths
+ may_convoy.setdefault(unit, [])
+ for convoyer in convoying_units:
+ if convoyer[2:] in flatten(possible_paths) and convoyer[2:] not in may_convoy[unit]:
+ may_convoy[unit] += [convoyer[2:]]
+
+ # Marking all convoys that are not in any path
+ invalid_convoys = convoying_units[:]
+ all_path_locs = list(set(flatten(possible_paths)))
+ for convoy in convoying_units:
+ if convoy[2:] in all_path_locs:
+ invalid_convoys.remove(convoy)
+ for convoy in invalid_convoys:
+ self.result[convoy] = ['no convoy']
+
+ # -----------------------------------------------------------
+ # STEP 1B. CANCEL ALL INVALID CONVOY ORDERS
+ for unit, order in self.command.items():
+ if order[0] != 'C':
+ continue
+ # word = ['C', 'PAR', 'MAR'] -> ['C', 'A', 'PAR', 'MAR']
+ word, mover_type = order.split(), 'AF'[unit[0] == 'A']
+ if word[1] != mover_type:
+ word[1:1] = [mover_type]
+ mover = '%s %s' % (mover_type, word[2])
+ if self._unit_owner(mover):
+ convoyer = may_convoy.get(mover, [])
+ offset = 1 if self.command.get(mover, '').split()[-1] == 'VIA' else 0
+ mover_dest = self.command.get(mover, '').split()[-1 - offset]
+ if unit[2:] not in convoyer or word[-1] != mover_dest:
+ self.result[unit] += ['void']
+ else:
+ self.command[unit] = 'H'
+
+ # -----------------------------------------------------------
+ # STEP 2. CANCEL INCONSISTENT SUPPORT ORDERS AND COUNT OTHERS
+ for unit, order in self.command.items():
+ if order[0] != 'S':
+ continue
+ word, signal = order.split(), 0
+
+ # Remove any trailing "H" from a support-in-place order.
+ if word[-1] == 'H':
+ del word[-1]
+ self.command[unit] = ' '.join(word)
+
+ # Stick the proper unit type (A or F) into the order;
+ # All supports will have it from here on
+ where = 1 + (word[1] in 'AF')
+ guy = self._occupant(word[where])
+
+ # See if there is a unit to receive the support
+ if not guy:
+ self.command[unit] = 'H'
+ if not signal:
+ self.result[unit] += ['void']
+ continue
+ word[1:where + 1] = guy.split()
+ self.command[unit] = ' '.join(word)
+
+ # See if the unit's order matches the supported order
+ if signal:
+ continue
+ coord = self.command[guy].split()
+
+ # 1) Void if support is for hold and guy is moving
+ # 2) Void if support is for move and guy isn't going where support is given
+ # 3) Void if support is give, but move over convoy failed
+ offset = 1 if coord[-1] == 'VIA' else 0
+ if ((len(word) < 5 and coord[0] == '-')
+ or (len(word) > 4 and (coord[0], coord[-1 - offset]) != ('-', word[4]))
+ or 'no convoy' in self.result[guy]):
+ self.result[unit] += ['void']
+ continue
+
+ # Okay, the support is valid
+ self.supports[guy][0] += 1
+
+ # If the unit is owned by the owner of the piece being attacked, add the unit to those
+ # whose supports are not counted toward dislodgment.
+ if coord[0] != '-':
+ continue
+ owner = self._unit_owner(unit)
+ other = self._unit_owner(self._occupant(coord[-1], any_coast=1))
+ if owner is other:
+ self.supports[guy][1] += [unit]
+
+ # -----------------------------------------------------------
+ # STEP 3. LET DIRECT (NON-CONVOYED) ATTACKS CUT SUPPORTS
+ for unit in self.command:
+ if not self.result[unit]:
+ self._cut_support(unit, direct=1)
+
+ # -----------------------------------------------------------
+ # STEPS 4 AND 5. DETERMINE CONVOY DISRUPTIONS
+ cut, cutters = 1, []
+ while cut:
+ cut = 0
+ self._strengths()
+
+ # STEP 4. CUT SUPPORTS MADE BY (non-maybe) CONVOYED ATTACKS
+ self._check_disruptions(may_convoy, 'maybe')
+ for unit in may_convoy:
+ if self.result[unit] or unit in cutters:
+ continue
+ self._cut_support(unit)
+ cutters += [unit]
+ cut = 1
+ if cut:
+ continue
+
+ # STEP 5. LOCATE NOW-DEFINITE CONVOY DISRUPTIONS, VOID SUPPORTS
+ # THESE CONVOYERS WERE GIVEN, AND ALLOW CONVOYING UNITS TO CUT SUPPORT
+ self._check_disruptions(may_convoy, 'no convoy', 'disrupted')
+ for unit in may_convoy:
+ if 'no convoy' in self.result[unit]:
+ for sup, help_unit in self.command.items():
+ if not (help_unit.find('S %s' % unit) or self.result[sup]):
+ self.result[sup] = ['no convoy']
+ if not (help_unit.find('C %s' % unit) or self.result[sup]):
+ self.result[sup] = ['no convoy']
+ self.supports[unit] = [0, []]
+ elif 'maybe' in self.result[unit] and unit not in cutters:
+ self.result[unit], cut = [], 1
+ self._cut_support(unit)
+ cutters += [unit]
+
+ # Recalculate strengths now that some are reduced by cuts
+ self._strengths()
+
+ # Mark bounces, then dislodges, and if any dislodges caused a cut
+ # loop over this whole kaboodle again
+ self.dislodged, cut = {}, 1
+ while cut: # pylint: disable=too-many-nested-blocks
+ # -----------------------------------------------------------
+ # STEPS 6-8. MARK BOUNCERS
+ self._bounce()
+
+ # STEP 9. MARK SUPPORTS CUT BY DISLODGES
+ cut = 0
+ for unit, order in self.command.items():
+ if order[0] != '-' or self.result[unit]:
+ continue
+ attack_order = order.split()
+ offset = 1 if attack_order[-1] == 'VIA' else 0
+ victim = self._occupant(attack_order[-1 - offset], any_coast=1)
+ if victim and self.command[victim][0] == 'S' and not self.result[victim]:
+ word = self.command[victim].split()
+ supported, sup_site = self._occupant(word[2]), word[-1][:3]
+
+ # This next line is the key. Convoyed attacks can dislodge, but even when doing so, they cannot cut
+ # supports offered for or against a convoying fleet
+ # (They can cut supports directed against the original position of the army, though.)
+ if len(attack_order) > 2 and sup_site != unit[2:5]:
+ continue
+ self.result[victim] += ['cut']
+ cut = 1
+ for sups in self.combat.get(sup_site, {}):
+ for guy, no_help in self.combat[sup_site][sups]:
+ if guy != supported:
+ continue
+ self.combat[sup_site][sups].remove([guy, no_help])
+ if not self.combat[sup_site][sups]:
+ del self.combat[sup_site][sups]
+ sups -= 1
+ if victim in no_help:
+ no_help.remove(victim)
+ self.combat[sup_site].setdefault(sups, []).append([guy, no_help])
+ break
+ else:
+ continue
+ break
+
+ # -----------------------------------------------------------
+ # STEP 10. MARK DISLODGEMENTS AND UNBOUNCE ALL MOVES THAT LEAD TO DISLODGING UNITS
+ for unit, order in self.command.items():
+ if order[0] != '-' or self.result[unit]:
+ continue
+ site = unit[2:5]
+ offset = 1 if order.split()[-1] == 'VIA' else 0
+ loser = self._occupant(order.split()[-1 - offset], any_coast=1)
+ if loser and (self.command[loser][0] != '-' or self.result[loser]):
+ self.result[loser] = [res for res in self.result[loser] if res != 'disrupted'] + ['dislodged']
+ self.dislodged[loser] = site
+
+ # Check for a dislodged swapper (attacker and dislodged units must not be convoyed.)
+ # If found, remove the swapper from the combat list of the attacker's space
+ head_to_head_battle = not self._is_moving_via_convoy(unit) and not self._is_moving_via_convoy(loser)
+ if self.command[loser][2:5] == site and head_to_head_battle:
+ for sups, items in self.combat.get(site, {}).items():
+ item = [x for x in items if x[0] == loser]
+ if item:
+ self._no_effect(item[0], site)
+ break
+
+ # Marking support for self-dislodgement as void
+ for supporting_unit in self.supports[unit][1]:
+ self.result[supporting_unit] += ['void']
+
+ # Unbounce any powerful-enough move that can now take the spot being vacated by the dislodger.
+ if site in self.combat:
+ self._unbounce(site)
+
+ # Done :-)
+
+ def _move_results(self):
+ """ Resolves moves (Movement phase) and returns a list of messages explaining what happened
+ :return: A list of lines for the results file explaining what happened during the phase
+ """
+ # Resolving moves
+ self._resolve_moves()
+
+ # Determine any retreats
+ for power in self.powers.values():
+ for unit in [u for u in power.units if u in self.dislodged]:
+ if unit not in power.retreats:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True)
+ power.retreats.setdefault(unit, [])
+ attacker_site, site = self.dislodged[unit], unit[2:]
+ attacker = self._occupant(attacker_site)
+ if self.map.loc_abut.get(site):
+ pushee = site
+ else:
+ pushee = site.lower()
+ for abut in self.map.loc_abut[pushee]:
+ abut = abut.upper()
+ where = abut[:3]
+ if ((self._abuts(unit[0], site, '-', abut) or self._abuts(unit[0], site, '-', where))
+ and (not self.combat.get(where)
+ and where != attacker_site or self._is_moving_via_convoy(attacker))):
+
+ # Armies cannot retreat to specific coasts
+ if unit[0] == 'F':
+ power.retreats[unit] += [abut]
+ elif where not in power.retreats[unit]:
+ power.retreats[unit] += [where]
+
+ # List all possible retreats
+ destroyed, self.popped = {}, []
+ if self.dislodged:
+ for power in self.powers.values():
+ for unit in [u for u in power.units if u in self.dislodged]:
+
+ # Removing unit
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ power.units.remove(unit)
+ to_where = power.retreats.get(unit)
+
+ # Describing what it can do
+ if to_where:
+ pass
+ else:
+ destroyed[unit] = power
+ self.popped += [unit]
+
+ # Now (finally) actually move the units that succeeded in moving
+ for power in self.powers.values():
+ for unit in power.units[:]:
+ if self.command[unit][0] == '-' and not self.result[unit]:
+ offset = 1 if self.command[unit].split()[-1] == 'VIA' else 0
+
+ # Removing
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ power.units.remove(unit)
+
+ # Adding
+ new_unit = unit[:2] + self.command[unit].split()[-1 - offset]
+ self.update_hash(power.name, unit_type=new_unit[0], loc=new_unit[2:])
+ power.units += [new_unit]
+
+ # Setting influence
+ for influence_power in self.powers.values():
+ if new_unit[2:5] in influence_power.influence:
+ influence_power.influence.remove(new_unit[2:5])
+ power.influence.append(new_unit[2:5])
+
+ # If units were destroyed, other units may go out of sight
+ if destroyed:
+ for unit, power in destroyed.items():
+ if unit in power.retreats:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True)
+ del power.retreats[unit]
+
+ # All finished
+ self._post_move_update()
+ return []
+
+ def _other_results(self):
+ """ Resolves moves (Retreat and Adjustment phase) and returns a list of messages explaining what happened
+ :return: A list of lines for the results file explaining what happened during the phase
+ """
+ # pylint: disable=too-many-statements,too-many-branches,too-many-nested-blocks
+ self.command = {}
+ self.ordered_units = {}
+ conflicts = {}
+
+ # Adjustments
+ if self.phase_type == 'A':
+ self.result = {}
+
+ # Emptying the results for the Adjustments Phase
+ for power in self.powers.values():
+ self.ordered_units.setdefault(power.name, [])
+ for order in power.adjust[:]:
+
+ # Void order - Marking it as such in results
+ if order.split()[0] == 'VOID':
+ word = order.split()[1:]
+ unit = ' '.join(word[:2])
+ self.result.setdefault(unit, []).append('void')
+ power.adjust.remove(order)
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ # Valid order - Marking as unprocessed
+ else:
+ word = order.split()
+ unit = ' '.join(word[:2])
+ self.result.setdefault(unit, [])
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ # CIVIL DISORDER
+ for power in self.powers.values():
+ diff = len(power.units) - len(power.centers)
+
+ # Detecting missing orders
+ for order in power.adjust[:]:
+ if diff == 0:
+ word = order.split()
+ unit = ' '.join(word[:2])
+ self.result.setdefault(unit, []).append('void')
+ power.adjust.remove(order)
+
+ # Looking for builds
+ elif diff < 0:
+ word = order.split()
+ unit = ' '.join(word[:2])
+ if word[-1] == 'B':
+ diff += 1
+ else:
+ self.result.setdefault(unit, []).append('void')
+ power.adjust.remove(order)
+
+ # Looking for removes
+ else:
+ word = order.split()
+ unit = ' '.join(word[:2])
+ if word[-1] == 'D':
+ diff -= 1
+ else:
+ self.result.setdefault(unit, []).append('void')
+ power.adjust.remove(order)
+
+ if not diff:
+ continue
+
+ power.civil_disorder = 1
+
+ # Need to remove units
+ if diff > 0:
+ fleets = PriorityDict()
+ armies = PriorityDict()
+
+ # Calculating distance to home
+ for unit in power.units:
+ distance = self._get_distance_to_home(unit[0], unit[2:], power.homes)
+ if unit[0] == 'F':
+ fleets[unit] = -1 * distance
+ else:
+ armies[unit] = -1 * distance
+
+ # Removing units
+ for unit in range(diff):
+ goner_distance, goner = 99999, None
+
+ # Removing units with largest distance (using fleets if they are equal)
+ # (using alpha name if multiple units)
+ if fleets:
+ goner_distance, goner = fleets.smallest()
+ if armies and armies.smallest()[0] < goner_distance:
+ goner_distance, goner = armies.smallest()
+ if goner is None:
+ break
+ if goner[0] == 'F':
+ del fleets[goner]
+ else:
+ del armies[goner]
+ power.adjust += ['%s D' % goner]
+ self.result.setdefault(goner, [])
+
+ # Need to build units
+ else:
+ sites = self._build_sites(power)
+ need = min(self._build_limit(power, sites), -diff)
+ power.adjust += ['WAIVE'] * need
+
+ # Retreats phase
+ elif self.phase_type == 'R':
+ self.result = {}
+
+ # Emptying the results for the Retreats Phase
+ for power in self.powers.values():
+ self.ordered_units.setdefault(power.name, [])
+ for retreats in power.retreats:
+ self.result[retreats] = []
+
+ # Emptying void orders - And marking them as such
+ for power in self.powers.values():
+ for order in power.adjust[:]:
+ if order.split()[0] == 'VOID':
+ word = order.split()[1:]
+ unit = ' '.join(word[:2])
+ self.result[unit] = ['void']
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+ power.adjust.remove(order)
+
+ # Disband units with no retreats
+ for power in self.powers.values():
+ if power.retreats and not power.adjust:
+ power.civil_disorder = 1
+ power.adjust = ['%s D' % r_unit for r_unit in power.retreats]
+
+ # Determine multiple retreats to the same location.
+ for power in self.powers.values():
+ for order in power.adjust or []:
+ word = order.split()
+ if len(word) == 4:
+ conflicts.setdefault(word[3][:3], []).append(' '.join(word[:2]))
+
+ # Determine retreat conflict (*bounce, destroyed*)
+ # When finished, "self.popped" will be a list of all retreaters who didn't make it.
+ for retreaters in conflicts.values():
+ if len(retreaters) > 1:
+ for retreater in retreaters:
+ if 'void' in self.result[retreater]:
+ self.result[retreater].remove('void')
+ self.result[retreater] += ['bounce', 'disband']
+ self.popped += retreaters
+
+ # Processing Build and Disband
+ for power in self.powers.values():
+ diff = len(power.units) - len(power.centers)
+
+ # For each order
+ for order in power.adjust or []:
+ word = order.split()
+ unit = ' '.join(word[:2])
+
+ # Build
+ if word[-1] == 'B' and len(word) > 2:
+ if diff < 0:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ power.units += [' '.join(word[:2])]
+ diff += 1
+ self.result[unit] += ['']
+ else:
+ self.result[unit] += ['void']
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ # Disband
+ elif word[-1] == 'D' and self.phase_type == 'A':
+ if diff > 0 and ' '.join(word[:2]) in power.units:
+ self.update_hash(power.name, unit_type=unit[0], loc=unit[2:])
+ power.units.remove(' '.join(word[:2]))
+ diff -= 1
+ self.result[unit] += ['']
+ else:
+ self.result[unit] += ['void']
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ # Retreat
+ elif len(word) == 4:
+ if unit not in self.popped:
+ self.update_hash(power.name, unit_type=word[0], loc=word[-1])
+ power.units += [word[0] + ' ' + word[-1]]
+ if unit in self.dislodged:
+ del self.dislodged[unit]
+
+ # Update influence
+ for influence_power in self.powers.values():
+ if word[-1] in influence_power.influence:
+ influence_power.influence.remove(word[-1])
+ power.influence.append(word[-1])
+
+ if unit not in self.ordered_units[power.name]:
+ self.ordered_units[power.name] += [unit]
+
+ for dis_unit in power.retreats:
+ self.update_hash(power.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ power.adjust, power.retreats, power.civil_disorder = [], {}, 0
+
+ # Disbanding
+ for unit in [u for u in self.dislodged]:
+ self.result.setdefault(unit, [])
+ if 'disband' not in self.result[unit]:
+ self.result[unit] += ['disband']
+ del self.dislodged[unit]
+ if unit not in self.popped:
+ self.popped += [unit]
+
+ return []
+
+ def _resolve(self):
+ """ Resolve the current phase
+ :return: A list of strings for the results file explaining how the phase was resolved.
+ """
+ this_phase = self.phase_type
+
+ # This method knows how to process movement, retreat, and adjustment phases.
+ # For others, implement resolve_phase()
+ if this_phase == 'M':
+ self._move_results()
+ elif this_phase in 'RA':
+ self._other_results()
+ self._advance_phase()
+
+ def _clear_history(self):
+ """ Clear all game history fields. """
+ self.state_history.clear()
+ self.order_history.clear()
+ self.result_history.clear()
+ self.message_history.clear()
+ self.clear_orders()
+ self.clear_vote()