aboutsummaryrefslogtreecommitdiff
path: root/diplomacy/engine/power.py
diff options
context:
space:
mode:
Diffstat (limited to 'diplomacy/engine/power.py')
-rw-r--r--diplomacy/engine/power.py392
1 files changed, 392 insertions, 0 deletions
diff --git a/diplomacy/engine/power.py b/diplomacy/engine/power.py
new file mode 100644
index 0000000..570359b
--- /dev/null
+++ b/diplomacy/engine/power.py
@@ -0,0 +1,392 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+""" Power
+ - Contains the power object representing a power in the game
+"""
+from copy import deepcopy
+from diplomacy.utils import parsing, strings
+from diplomacy.utils.exceptions import DiplomacyException
+from diplomacy.utils.jsonable import Jsonable
+from diplomacy.utils.sorted_dict import SortedDict
+from diplomacy.utils import common, constants
+from diplomacy.utils.constants import OrderSettings
+
+class Power(Jsonable):
+ """ Power Class
+
+ Properties:
+ - abbrev - Contains the abbrev of the power (usually the first letter of the power name) (e.g. 'F' for FRANCE)
+ - adjust - List of pending adjustment orders
+ (e.g. ['A PAR B', 'A PAR R MAR', 'A MAR D', 'WAIVE'])
+ - centers - Contains the list of supply centers currently controlled by the power ['MOS', 'SEV', 'STP', 'WAR']
+ - civil_disorder - Boolean flag to indicate that the power has been put in CIVIL_DISORDER (e.g. True or False)
+ - controller - Sorted dictionary mapping timestamp to controller (either dummy or a user ID) who takes
+ control of power at this timestamp.
+ - game - Contains a reference to the game object
+ - goner - Boolean to indicate that this power doesn't control any SCs any more (e.g. True or False)
+ - homes - Contains a list of homes supply centers (where you can build)
+ e.g. ['PAR', 'MAR', ... ] or None if empty
+ - influence - Contains a list of locations influenced by this power
+ Note: To influence a location, the power must have visited it last.
+ e.g ['PAR', 'MAR', ... ]
+ - name - Contains the name of the power
+ - orders - Contains a dictionary of units and their orders.
+ For NO_CHECK games, unit is 'ORDER 1', 'ORDER 2', ...
+ - e.g. {'A PAR': '- MAR' } or {'ORDER 1': 'A PAR - MAR', 'ORDER 2': '...', ... }
+ - Can also be {'REORDER 1': 'A PAR - MAR', 'INVALID 1': 'A PAR - MAR', ... } after validation
+ - retreats - Contains the list of units that need to retreat with their possible retreat locations
+ (e.g. {'A PAR': ['MAR', 'BER']})
+ - role - Power type (observer, omniscient, player or server power).
+ Either the power name (for a player power) or a value in diplomacy.utils.strings.ALL_ROLE_TYPES
+ - tokens - Only for server power: set of tokens of current power controlled (if not None).
+ - units - Contains the list of units (e.g. ['A PAR', 'A MAR', ...]
+ - vote - Only for omniscient, player and server power: power vote ('yes', 'no' or 'neutral').
+ """
+ __slots__ = ['game', 'name', 'abbrev', 'adjust', 'centers', 'units', 'influence', 'homes',
+ 'retreats', 'goner', 'civil_disorder', 'orders', 'role', 'controller', 'vote',
+ 'order_is_set', 'wait', 'tokens']
+ model = {
+ strings.ABBREV: parsing.OptionalValueType(str),
+ strings.ADJUST: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.CENTERS: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.CIVIL_DISORDER: parsing.DefaultValueType(int, 0),
+ strings.CONTROLLER: parsing.DefaultValueType(parsing.DictType(int, str, SortedDict.builder(int, str)), {}),
+ strings.HOMES: parsing.OptionalValueType(parsing.SequenceType(str)),
+ strings.INFLUENCE: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.NAME: parsing.PrimitiveType(str),
+ strings.ORDER_IS_SET: parsing.DefaultValueType(OrderSettings.ALL_SETTINGS, OrderSettings.ORDER_NOT_SET),
+ strings.ORDERS: parsing.DefaultValueType(parsing.DictType(str, str), {}),
+ strings.RETREATS: parsing.DefaultValueType(parsing.DictType(str, parsing.SequenceType(str)), {}),
+ strings.ROLE: parsing.DefaultValueType(str, strings.SERVER_TYPE),
+ strings.TOKENS: parsing.DefaultValueType(parsing.SequenceType(str, set), ()),
+ strings.UNITS: parsing.DefaultValueType(parsing.SequenceType(str), []),
+ strings.VOTE: parsing.DefaultValueType(parsing.EnumerationType(strings.ALL_VOTE_DECISIONS), strings.NEUTRAL),
+ strings.WAIT: parsing.DefaultValueType(bool, True),
+ }
+
+ def __init__(self, game=None, name=None, **kwargs):
+ """ Constructor """
+ self.game = game
+ self.abbrev = None
+ self.adjust, self.centers, self.units, self.influence = [], [], [], []
+ self.homes = None
+ self.retreats = {}
+ self.goner = self.civil_disorder = 0
+ self.orders = {}
+
+ self.name = ''
+ self.role = ''
+ self.controller = SortedDict(int, str)
+ self.vote = ''
+ self.order_is_set = 0
+ self.wait = False
+ self.tokens = set()
+ super(Power, self).__init__(name=name, **kwargs)
+ assert self.role in strings.ALL_ROLE_TYPES or self.role == self.name
+ if not self.controller:
+ self.controller.put(common.timestamp_microseconds(), strings.DUMMY)
+
+ def __str__(self):
+ """ Returns a representation of the power instance """
+ show_cd = self.civil_disorder
+ show_inhabits = self.homes is not None
+ show_owns = self.centers
+ show_retreats = len(self.retreats) > 0
+
+ text = ''
+ text += '\n%s (%s)' % (self.name, self.role)
+ text += '\nPLAYER %s' % self.controller.last_value()
+ text += '\nCD' if show_cd else ''
+ text += '\nINHABITS %s' % ' '.join(self.homes) if show_inhabits else ''
+ text += '\nOWNS %s' % ' '.join(self.centers) if show_owns else ''
+ if show_retreats:
+ text += '\n'.join([''] + [' '.join([unit, '-->'] + places) for unit, places in self.retreats.items()])
+ text = '\n'.join([text] + self.units + self.adjust)
+
+ # Orders - RIO is for REORDER, INVALID, ORDER (in NO_CHECK games)
+ text_order = '\nORDERS\n'
+ for unit, order in self.orders.items():
+ if unit[0] not in 'RIO':
+ text_order += '%s ' % unit
+ text_order += order + '\n'
+
+ text += text_order if self.orders else ''
+ return text
+
+ def __deepcopy__(self, memo):
+ """ Fast deep copy implementation
+ - (Not setting the game object)
+ """
+ cls = self.__class__
+ result = cls.__new__(cls)
+
+ # Deep copying
+ for key in self.__slots__:
+ if key not in ['game']:
+ setattr(result, key, deepcopy(getattr(self, key)))
+
+ # Game
+ setattr(result, 'game', None)
+ return result
+
+ def reinit(self, include_flags=6):
+ """ Performs a reinitialization of some of the parameters
+ :param include_flags: Bit mask to indicate which params to reset
+ (bit 1 = orders, 2 = persistent, 4 = transient)
+ :return: None
+ """
+ reinit_persistent = include_flags & 2
+ reinit_transient = include_flags & 4
+ reinit_orders = include_flags & 1
+
+ # Initialize the persistent parameters
+ if reinit_persistent:
+ self.abbrev = None
+
+ # Initialize the transient parameters
+ if reinit_transient:
+ for home in self.homes:
+ self.game.update_hash(self.name, loc=home, is_home=True)
+ for center in self.centers:
+ self.game.update_hash(self.name, loc=center, is_center=True)
+ for unit in self.units:
+ self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:])
+ for dis_unit in self.retreats:
+ self.game.update_hash(self.name, unit_type=dis_unit[0], loc=dis_unit[2:], is_dislodged=True)
+ self.homes = None
+ self.centers, self.units, self.influence = [], [], []
+ self.retreats = {}
+
+ # Initialize the order-related parameters
+ if reinit_orders:
+ self.civil_disorder = 0
+ self.adjust = []
+ self.orders = {}
+ if self.is_eliminated():
+ self.order_is_set = OrderSettings.ORDER_SET_EMPTY
+ self.wait = False
+ else:
+ self.order_is_set = OrderSettings.ORDER_NOT_SET
+ self.wait = True if self.is_dummy() else (not self.game.real_time)
+ self.goner = 0
+
+ @staticmethod
+ def compare(power_1, power_2):
+ """ Comparator object - Compares two Power objects
+ :param power_1: The first Power object to compare
+ :param power_2: The second Power object to compare
+ :return: 1 if self is greater, -1 if other is greater, 0 if they are equal
+ """
+ cmp = lambda power_1, power_2: ((power_1 > power_2) - (power_1 < power_2))
+ xstr = lambda string: string or '' # To avoid comparing with None
+ cmp_type = cmp(xstr(power_1.role), xstr(power_2.role))
+ cmp_name = cmp(xstr(power_1.name), xstr(power_2.name))
+ return cmp_type or cmp_name
+
+ def initialize(self, game):
+ """ Initializes a game and resets home, centers and units
+ :param game: The game to use for initialization
+ :type game: diplomacy.Game
+ """
+
+ # Not initializing observers and monitors
+ assert self.is_server_power()
+
+ self.game = game
+ self.order_is_set = OrderSettings.ORDER_NOT_SET
+ self.wait = True if self.is_dummy() else (not self.game.real_time)
+
+ # Get power abbreviation.
+ self.abbrev = self.game.map.abbrev.get(self.name, self.name[0])
+
+ # Resets homes
+ if self.homes is None:
+ self.homes = []
+ for home in game.map.homes.get(self.name, []):
+ self.game.update_hash(self.name, loc=home, is_home=True)
+ self.homes.append(home)
+
+ # Resets the centers and units
+ if not self.centers:
+ for center in game.map.centers.get(self.name, []):
+ game.update_hash(self.name, loc=center, is_center=True)
+ self.centers.append(center)
+ if not self.units:
+ for unit in game.map.units.get(self.name, []):
+ game.update_hash(self.name, unit_type=unit[0], loc=unit[2:])
+ self.units.append(unit)
+ self.influence.append(unit[2:5])
+
+ def merge(self, other_power):
+ """ Transfer all units, centers, and homes of the other_power to this power
+ :param other_power: The other power (will be empty after the merge)
+ """
+ # Regular units
+ for unit in list(other_power.units):
+ self.units.append(unit)
+ other_power.units.remove(unit)
+ self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:])
+ self.game.update_hash(other_power.name, unit_type=unit[0], loc=unit[2:])
+
+ # Dislodged units
+ for unit in list(other_power.retreats.keys()):
+ self.retreats[unit] = other_power.retreats[unit]
+ del other_power.retreats[unit]
+ self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True)
+ self.game.update_hash(other_power.name, unit_type=unit[0], loc=unit[2:], is_dislodged=True)
+
+ # Influence
+ for loc in list(other_power.influence):
+ self.influence.append(loc)
+ other_power.influence.remove(loc)
+
+ # Supply centers
+ for center in list(other_power.centers):
+ self.centers.append(center)
+ other_power.centers.remove(center)
+ self.game.update_hash(self.name, loc=center, is_center=True)
+ self.game.update_hash(other_power.name, loc=center, is_center=True)
+
+ # Homes
+ for home in list(other_power.homes):
+ self.homes.append(home)
+ other_power.homes.remove(home)
+ self.game.update_hash(self.name, loc=home, is_home=True)
+ self.game.update_hash(other_power.name, loc=home, is_home=True)
+
+ # Clearing state cache
+ self.game.clear_cache()
+
+ def clear_units(self):
+ """ Removes all units from the map """
+ for unit in self.units:
+ self.game.update_hash(self.name, unit_type=unit[0], loc=unit[2:])
+ self.units = []
+ self.influence = []
+ self.game.clear_cache()
+
+ def clear_centers(self):
+ """ Removes ownership of all supply centers """
+ for center in self.centers:
+ self.game.update_hash(self.name, loc=center, is_center=True)
+ self.centers = []
+ self.game.clear_cache()
+
+ def is_dummy(self):
+ """ Indicates if the power is a dummy
+ :return: Boolean flag to indicate if the power is a dummy
+ """
+ return self.controller.last_value() == strings.DUMMY
+
+ def is_eliminated(self):
+ """ Returns a flag to show if player is eliminated
+ :return: If the current power is eliminated
+ """
+ # Not eliminated if has units left
+ if self.units or self.centers or self.retreats:
+ return False
+ return True
+
+ def clear_orders(self):
+ """ Clears the power's orders """
+ self.reinit(include_flags=1)
+
+ def moves_submitted(self):
+ """ Returns a boolean to indicate if moves has been submitted
+ :return: 1 if not in Movement phase, or orders submitted, or no more units lefts
+ """
+ if self.game.phase_type != 'M':
+ return 1
+ return self.orders or not self.units
+
+ # ==============================================================
+ # Application/network methods (mainly used for connected games).
+ # ==============================================================
+
+ def is_observer_power(self):
+ """ Return True if this power is an observer power. """
+ return self.role == strings.OBSERVER_TYPE
+
+ def is_omniscient_power(self):
+ """ Return True if this power is an omniscient power. """
+ return self.role == strings.OMNISCIENT_TYPE
+
+ def is_player_power(self):
+ """ Return True if this power is a player power. """
+ return self.role == self.name
+
+ def is_server_power(self):
+ """ Return True if this power is a server power. """
+ return self.role == strings.SERVER_TYPE
+
+ def is_controlled(self):
+ """ Return True if this power is controlled. """
+ return self.controller.last_value() != strings.DUMMY
+
+ def does_not_wait(self):
+ """ Return True if this power does not wait (ie. if we could already process orders of this power). """
+ return self.order_is_set and not self.wait
+
+ def update_controller(self, username, timestamp):
+ """ Update controller with given username and timestamp. """
+ self.controller.put(timestamp, username)
+
+ def set_controlled(self, username):
+ """ Control power with given username. Username may be None (meaning no controller). """
+ if username is None or username == strings.DUMMY:
+ if self.controller.last_value() != strings.DUMMY:
+ self.controller.put(common.timestamp_microseconds(), strings.DUMMY)
+ self.tokens.clear()
+ self.wait = True
+ self.vote = strings.NEUTRAL
+ elif self.controller.last_value() == strings.DUMMY:
+ self.controller.put(common.timestamp_microseconds(), username)
+ self.wait = not self.game.real_time
+ elif self.controller.last_value() != username:
+ raise DiplomacyException('Power already controlled by someone else. Kick previous controller before.')
+
+ def get_controller(self):
+ """ Return current power controller name ('dummy' if power is not controlled). """
+ return self.controller.last_value()
+
+ def get_controller_timestamp(self):
+ """ Return timestamp when current controller took control of this power. """
+ return self.controller.last_key()
+
+ def is_controlled_by(self, username):
+ """ Return True if this power is controlled by given username. """
+ if username == constants.PRIVATE_BOT_USERNAME:
+ # Bot is connected if power is dummy and has some associated tokens.
+ return self.is_dummy() and bool(self.tokens)
+ return self.controller.last_value() == username
+
+ # Server-only methods.
+
+ def has_token(self, token):
+ """ Return True if this power has given token. """
+ assert self.is_server_power()
+ return token in self.tokens
+
+ def add_token(self, token):
+ """ Add given token to this power. """
+ assert self.is_server_power()
+ self.tokens.add(token)
+
+ def remove_tokens(self, tokens):
+ """ Remove sequence of tokens from this power. """
+ assert self.is_server_power()
+ self.tokens.difference_update(tokens)