diff options
Diffstat (limited to 'diplomacy')
-rw-r--r-- | diplomacy/daide/responses.py | 753 | ||||
-rw-r--r-- | diplomacy/daide/tests/test_responses.py | 339 |
2 files changed, 1092 insertions, 0 deletions
diff --git a/diplomacy/daide/responses.py b/diplomacy/daide/responses.py new file mode 100644 index 0000000..735ebe1 --- /dev/null +++ b/diplomacy/daide/responses.py @@ -0,0 +1,753 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +""" DAIDE Responses - Contains a list of responses sent by the server to the client """ +from diplomacy import Map +from diplomacy.communication.responses import _AbstractResponse +from diplomacy.daide.clauses import String, Power, Province, Turn, Unit, add_parentheses, strip_parentheses, \ + parse_string +from diplomacy.daide import tokens +from diplomacy.daide.tokens import Token +from diplomacy.daide.utils import bytes_to_str +from diplomacy.utils.splitter import OrderSplitter + +class DaideResponse(_AbstractResponse): + """ Represents a DAIDE response. """ + def __init__(self, **kwargs): + """ Constructor """ + self._bytes = b'' + super(DaideResponse, self).__init__(**kwargs) + + def __bytes__(self): + """ Returning the bytes representation of the response """ + return self._bytes + + def __str__(self): + """ Returning the string representation of the response """ + return bytes_to_str(self._bytes) + +class MapNameResponse(DaideResponse): + """ Represents a MAP DAIDE response. Sends the name of the current map to the client. + Syntax: + MAP ('name') + """ + def __init__(self, map_name, **kwargs): + """ Builds the response + :param map_name: String. The name of the current map. + """ + super(MapNameResponse, self).__init__(**kwargs) + self._bytes = bytes(tokens.MAP) \ + + bytes(parse_string(String, map_name)) + +class MapDefinitionResponse(DaideResponse): + """ Represents a MDF DAIDE response. Sends configuration of a map to a client + Syntax: + MDF (powers) (provinces) (adjacencies) + powers syntax: + power power ... + power syntax: + AUS # Austria + ENG # England + FRA # France + GER # Germany + ITA # Italy + RUS # Russia + TUR # Turkey + provinces syntax: + (supply_centres) (non_supply_centres) + supply_centres syntax: + (power centre centre ...) (power centre centre ...) ... + supply_centres power syntax: + (power power ...) # This is currently not supported + AUS # Austria + ENG # England + FRA # France + GER # Germany + ITA # Italy + RUS # Russia + TUR # Turkey + UNO # Unknown power + non_supply_centres syntax: + province province ... # List of provinces + adjacencies syntax: + (prov_adjacencies) (prov_adjacencies) ... + prov_adjacencies syntax: + province (unit_type adjacent_prov adjacent_prov ...) (unit_type adjacent_prov adjacent_prov ...) ... + unit_type syntax: + AMY # List of provinces an army can move to + FLT # List of provinces a fleet can move to + (FLT coast) # List of provinces a fleet can move to from the given coast + adjacent_prov syntax: + province # A province which can be moved to + (province coast) # A coast of a province that can be moved to + """ + def __init__(self, map_name, **kwargs): + """ Builds the response + :param map_name: The name of the map + """ + super(MapDefinitionResponse, self).__init__(**kwargs) + game_map = Map(map_name) + + # (Powers): (power power ...) + # (Provinces): ((supply_centers) (non_supply_centres)) + # (Adjacencies): ((prov_adjacencies) (prov_adjacencies) ...) + powers_clause = self._build_powers_clause(game_map) + provinces_clause = self._build_provinces_clause(game_map) + adjacencies_clause = self._build_adjacencies_clause(game_map) + + self._bytes = bytes(tokens.MDF) \ + + powers_clause \ + + provinces_clause \ + + adjacencies_clause + + @staticmethod + def _build_powers_clause(game_map): + """ Build the powers clause + Syntax: + (powers) + powers syntax: + power power ... + power syntax: + AUS # Austria + ENG # England + FRA # France + GER # Germany + ITA # Italy + RUS # Russia + TUR # Turkey + """ + power_names = game_map.powers[:] + power_names.sort() + + # (Powers): (power power ...) + powers_clause = [bytes(parse_string(Power, power_name)) for power_name in power_names] + powers_clause = add_parentheses(b''.join(powers_clause)) + return powers_clause + + @staticmethod + def _build_provinces_clause(game_map): + """ Build the provinces clause + Syntax: + (provinces) + provinces syntax: + (supply_centres) (non_supply_centres) + supply_centres syntax: + (power centre centre ...) (power centre centre ...) ... + supply_centres power syntax: + (power power ...) # This is currently not supported + AUS # Austria + ENG # England + FRA # France + GER # Germany + ITA # Italy + RUS # Russia + TUR # Turkey + UNO # Unknown power + non_supply_centres syntax: + province province ... # List of provinces + """ + unowned_scs = game_map.scs[:] + unowned_scs.sort() + + # (Supply centers): ((power centre centre ...) (power centre centre ...) ...) + # (Non supply centres): (province province ...) + scs_clause = [] + non_scs_clause = [] + + power_names_centers = [(power_name, centers[:]) for power_name, centers in game_map.centers.items()] + power_names_centers.sort(key=lambda power_name_center: power_name_center[0]) + + # Parsing each power centers + for power_name, centers in power_names_centers: + centers.sort() + + power_scs_clause = [bytes(parse_string(Power, power_name))] + for center in centers: + power_scs_clause.append(bytes(parse_string(Province, center))) + unowned_scs.remove(center) + + # (Power supply centers): (power centre centre ...) + power_scs_clause = add_parentheses(b''.join(power_scs_clause)) + scs_clause.append(power_scs_clause) + + # (Power supply centers): (power centre centre ...) + power_scs_clause = [bytes(tokens.UNO)] + power_scs_clause += [bytes(parse_string(Province, center)) for center in unowned_scs] + power_scs_clause = add_parentheses(b''.join(power_scs_clause)) + + # (Supply centers): ((power centre centre ...) (power centre centre ...) ...) + scs_clause.append(power_scs_clause) + scs_clause = add_parentheses(b''.join(scs_clause)) + + provinces = game_map.locs[:] + provinces.sort() + for province in provinces: + if game_map.area_type(province) == 'SHUT': + continue + + province = province[:3].upper() + province_clause = bytes(parse_string(Province, province)) + if province_clause not in non_scs_clause and province not in game_map.scs: + non_scs_clause.append(province_clause) + + # (Non supply centres): (province province ...) + non_scs_clause = add_parentheses(b''.join(non_scs_clause)) + + # (Provinces): ((supply_centers) (non_supply_centres)) + provinces_clause = [scs_clause, non_scs_clause] + provinces_clause = add_parentheses(b''.join(provinces_clause)) + + return provinces_clause + + @staticmethod + def _build_adjacencies_clause(game_map): + """ Build the adjacencies clause + Syntax: + (adjacencies) + adjacencies syntax: + (prov_adjacencies) (prov_adjacencies) ... + prov_adjacencies syntax: + province (unit_type adjacent_prov adjacent_prov ...) (unit_type adjacent_prov adjacent_prov ...) ... + unit_type syntax: + AMY # List of provinces an army can move to + FLT # List of provinces a fleet can move to + (FLT coast) # List of provinces a fleet can move to from the given coast + adjacent_prov syntax: + province # A province which can be moved to + (province coast) # A coast of a province that can be moved to + """ + adjacencies = {} # {province: {'A': [], 'F': [], '/': []} army abuts, fleet abuts, / abuts + + # For each province + for province in sorted([loc.upper() for loc in game_map.locs if '/' not in loc]): + province_type = game_map.area_type(province) + + if province_type == 'SHUT': + continue + + # Creating empty list of adjacent provinces + adjacencies.setdefault(province, {}) + adjacencies[province].setdefault('A', []) # List of adjacent provinces where armies can move + for province_w_coast in sorted(game_map.find_coasts(province)): + coast = province_w_coast[3:] + adjacencies[province].setdefault(coast, []) # List of adjacent provinces where fleets can move + + # Building list of adjacent provinces + for coast in adjacencies[province]: # 'A', '', '/NC', '/SC', '/EC', '/WC' + + # Army adjacencies + if coast == 'A': + for dest in sorted(game_map.dest_with_coasts[province]): + if game_map.abuts('A', province, '-', dest): + adjacencies[province]['A'].append(bytes(parse_string(Province, dest))) + + # Fleet adjacencies + else: + for dest in sorted(game_map.dest_with_coasts[province + coast]): + if game_map.abuts('F', province + coast, '-', dest): + adjacencies[province][coast].append(bytes(parse_string(Province, dest))) + + # If province has coasts ('/NC', '/SC'), removing the adjacency for fleets without coast + if len(adjacencies[province]) > 2: + del adjacencies[province][''] + + # Building adjacencies clause + adjacencies_clause = [] + for province in adjacencies: + prov_adjacencies_clause = [bytes(parse_string(Province, province))] + + for coast in ('A', '', '/EC', '/NC', '/SC', '/WC'): + if coast not in adjacencies[province]: + continue + if not adjacencies[province][coast]: + continue + + # (Army adjacencies): (AMY adjacent_prov adjacent_prov ...) + if coast == 'A': + amy_adjacencies_clause = [bytes(tokens.AMY)] + adjacencies[province][coast] + amy_adjacencies_clause = add_parentheses(b''.join(amy_adjacencies_clause)) + prov_adjacencies_clause.append(amy_adjacencies_clause) + + # (Fleet provinces): (FLT adjacent_prov adjacent_prov ...) + elif coast == '': + flt_adjacencies_clause = [bytes(tokens.FLT)] + adjacencies[province][coast] + flt_adjacencies_clause = add_parentheses(b''.join(flt_adjacencies_clause)) + prov_adjacencies_clause.append(flt_adjacencies_clause) + + # (Fleet coast): (FLT coast) + # (Fleet coast provinces): ((FLT coast) adjacent_prov adjacent_prov ...) + else: + flt_clause = bytes(tokens.FLT) + coast_clause = bytes(parse_string(Province, coast)) + coast_flt_adjacencies_clause = [add_parentheses(flt_clause + coast_clause)] \ + + adjacencies[province][coast] + coast_flt_adjacencies_clause = add_parentheses(b''.join(coast_flt_adjacencies_clause)) + prov_adjacencies_clause.append(coast_flt_adjacencies_clause) + + # (Province adjacencies): (province (unit_type adjacent_prov adjacent_prov ...) + # (unit_type adjacent_prov adjacent_prov ...) ...) + prov_adjacencies_clause = add_parentheses(b''.join(prov_adjacencies_clause)) + adjacencies_clause.append(prov_adjacencies_clause) + + # (Adjacencies): ((prov_adjacencies) (prov_adjacencies) ...) + adjacencies_clause = add_parentheses(b''.join(adjacencies_clause)) + return adjacencies_clause + +class HelloResponse(DaideResponse): + """ Represents a HLO DAIDE response. Sends the power to be played by the client with the passcode to rejoin the + game and the details of the game. + Syntax: + HLO (power) (passcode) (variant) (variant) ... + Variant syntax: + LVL n # Level of the syntax accepted + MTL seconds # Movement time limit + RTL seconds # Retreat time limit + BTL seconds # Build time limit + DSD # Disables the time limit when a client disconects + AOA # Any orders accepted + LVL 10: + Variant syntax: + PDA # Accept partial draws + NPR # No press during retreat phases + NPB # No press during build phases + PTL seconds # Press time limit + """ + def __init__(self, power_name, passcode, level, deadline, rules, **kwargs): + """ Builds the response + :param power_name: The name of the power being played. + :param passcode: Integer. A passcode to rejoin the game. + :param level: Integer. The daide syntax level of the game + :param deadline: Integer. The number of seconds per turn (0 to disable) + :param rules: The list of game rules. + """ + super(HelloResponse, self).__init__(**kwargs) + power = parse_string(Power, power_name) + passcode = Token(from_int=passcode) + + if 'NO_PRESS' in rules: + level = 0 + variants = add_parentheses(bytes(tokens.LVL) + bytes(Token(from_int=level))) + + if deadline > 0: + variants += add_parentheses(bytes(tokens.MTL) + bytes(Token(from_int=deadline))) + variants += add_parentheses(bytes(tokens.RTL) + bytes(Token(from_int=deadline))) + variants += add_parentheses(bytes(tokens.BTL) + bytes(Token(from_int=deadline))) + + if 'NO_CHECK' in rules: + variants += add_parentheses(bytes(tokens.AOA)) + + self._bytes = bytes(tokens.HLO) \ + + add_parentheses(bytes(power)) \ + + add_parentheses(bytes(passcode)) \ + + add_parentheses(bytes(variants)) + +class SupplyCenterResponse(DaideResponse): + """ Represents a SCO DAIDE response. Sends the current supply centre ownership. + Syntax: + SCO (power centre centre ...) (power centre centre ...) ... + """ + def __init__(self, powers_centers, map_name, **kwargs): + """ Builds the response + :param powers_centers: A dict of {power_name: centers} objects + :param map_name: The name of the map + """ + super(SupplyCenterResponse, self).__init__(**kwargs) + remaining_scs = Map(map_name).scs[:] + all_powers_bytes = [] + + # Parsing each power + for power_name, centers in sorted(powers_centers.items()): + power_clause = parse_string(Power, power_name) + power_bytes = bytes(power_clause) + + for center in centers: + sc_clause = parse_string(Province, center) + power_bytes += bytes(sc_clause) + remaining_scs.remove(center) + + all_powers_bytes += [power_bytes] + + # Parsing unowned center + uno_token = tokens.UNO + power_bytes = bytes(uno_token) + + for center in remaining_scs: + sc_clause = parse_string(Province, center) + power_bytes += bytes(sc_clause) + + all_powers_bytes += [power_bytes] + + # Storing full response + self._bytes = bytes(tokens.SCO) \ + + b''.join([add_parentheses(power_bytes) for power_bytes in all_powers_bytes]) + +class CurrentPositionResponse(DaideResponse): + """ Represents a NOW DAIDE response. Sends the current turn, and the current unit positions. + Syntax: + NOW (turn) (unit) (unit) ... + Unit syntax: + power unit_type province + power unit_type province MRT (province province ...) + """ + + def __init__(self, phase_name, powers_units, powers_retreats, **kwargs): + """ Builds the response + :param phase_name: The name of the current phase (e.g. 'S1901M') + :param powers: A list of `diplomacy.engine.power.Power` objects + """ + super(CurrentPositionResponse, self).__init__(**kwargs) + units_bytes_buffer = [] + + # Turn + turn_clause = parse_string(Turn, phase_name) + + # Units + for power_name, units in sorted(powers_units.items()): + # Regular units + for unit in units: + unit_clause = parse_string(Unit, '%s %s' % (power_name, unit)) + units_bytes_buffer += [bytes(unit_clause)] + + # Dislodged units + for unit, retreat_provinces in powers_retreats[power_name].items(): + unit_clause = parse_string(Unit, '%s %s' % (power_name, unit)) + retreat_clauses = [parse_string(Province, province) for province in retreat_provinces] + units_bytes_buffer += [add_parentheses(strip_parentheses(bytes(unit_clause)) + + bytes(tokens.MRT) + + add_parentheses(b''.join([bytes(province) + for province in retreat_clauses])))] + + # Storing full response + self._bytes = bytes(tokens.NOW) + bytes(turn_clause) + b''.join(units_bytes_buffer) + +class ThanksResponse(DaideResponse): + """ Represents a THX DAIDE response. Sends the result of an order after submission. + Syntax: + THX (order) (note) + Note syntax: + MBV # Order is OK. + FAR # Not adjacent. + NSP # No such province + NSU # No such unit + NAS # Not at sea (for a convoying fleet) + NSF # No such fleet (in VIA section of CTO or the unit performing a CVY) + NSA # No such army (for unit being ordered to CTO or for unit being CVYed) + NYU # Not your unit + NRN # No retreat needed for this unit + NVR # Not a valid retreat space + YSC # Not your supply centre + ESC # Not an empty supply centre + HSC # Not a home supply centre + NSC # Not a supply centre + CST # No coast specified for fleet build in StP, or an attempt + to build a fleet inland, or an army at sea. + NMB # No more builds allowed + NMR # No more removals allowed + NRS # Not the right season + """ + def __init__(self, order_bytes, results, **kwargs): + """ Builds the response + :param order_bytes: The bytes received for the order + :param results: An array containing the error codes. + """ + super(ThanksResponse, self).__init__(**kwargs) + if not results or 0 in results: # Order success response + note_clause = tokens.MBV + else: # Generic order failure response + note_clause = tokens.NYU + + # Storing full response + self._bytes = bytes(tokens.THX) + order_bytes + add_parentheses(bytes(note_clause)) + +class MissingOrdersResponse(DaideResponse): + """ Represents a MIS DAIDE response. Sends the list of unit for which an order is missing or indication about + required disbands or builds. + Syntax: + MIS (unit) (unit) ... + MIS (unit MRT (province province ...)) (unit MRT (province province ...)) ... + MIS (number) + """ + def __init__(self, phase_name, power, **kwargs): + """ Builds the response + :param phase_name: The name of the current phase (e.g. 'S1901M') + :param power: The power to check for missing orders + :type power: diplomacy.engine.power.Power + """ + super(MissingOrdersResponse, self).__init__(**kwargs) + assert phase_name[-1] in 'MRA', 'Invalid phase "%s"' & phase_name + {'M': self._build_movement_phase, + 'R': self._build_retreat_phase, + 'A': self._build_adjustment_phase}[phase_name[-1]](power) + + def _build_movement_phase(self, power): + """ Builds the missing orders response for a movement phase """ + units_with_no_order = [unit for unit in power.units] + + # Removing units for which we have orders + for key, value in power.orders.items(): + unit = key # Regular game {e.g. 'A PAR': '- BUR') + if key[0] in 'RIO': # No-check game (key is INVALID, ORDER x, REORDER x) + unit = ' '.join(value.split()[:2]) + if unit in units_with_no_order: + units_with_no_order.remove(unit) + + # Storing full response + self._bytes = bytes(tokens.MIS) + \ + b''.join([bytes(parse_string(Unit, '%s %s' % (power.name, unit))) + for unit in units_with_no_order]) + + def _build_retreat_phase(self, power): + """ Builds the missing orders response for a retreat phase """ + units_bytes_buffer = [] + + units_with_no_order = {unit: retreat_provinces for unit, retreat_provinces in power.retreats.items()} + + # Removing units for which we have orders + for key, value in power.orders.items(): + unit = key # Regular game {e.g. 'A PAR': '- BUR') + if key[0] in 'RIO': # No-check game (key is INVALID, ORDER x, REORDER x) + unit = ' '.join(value.split()[:2]) + if unit in units_with_no_order: + del units_with_no_order[unit] + + # Sorting by the unit's province ASC so results are deterministic + for unit, retreat_provinces in sorted(units_with_no_order.items(), + key=lambda key_val: key_val[0].split()[-1]): + unit_clause = parse_string(Unit, '%s %s' % (power.name, unit)) + retreat_clauses = [parse_string(Province, province) for province in retreat_provinces] + units_bytes_buffer += [add_parentheses(strip_parentheses(bytes(unit_clause)) + + bytes(tokens.MRT) + + add_parentheses(b''.join([bytes(province) + for province in retreat_clauses])))] + + self._bytes = bytes(tokens.MIS) + b''.join(units_bytes_buffer) + + def _build_adjustment_phase(self, power): + """ Builds the missing orders response for a build phase """ + adjusts = [OrderSplitter(adjust) for adjust in power.adjust] + build_cnt = sum(1 for adjust in adjusts if adjust.order_type == 'B') + disband_cnt = sum(1 for adjust in adjusts if adjust.order_type == 'D') + disbands_status = (len(power.units) + build_cnt) - (len(power.centers) + disband_cnt) + + if disbands_status < 0: + available_homes = power.homes[:] + + # Removing centers for which it's impossible to build + for unit in [unit.split() for unit in power.units]: + province = unit[1] + if province in available_homes: + available_homes.remove(province) + + disbands_status = max(-len(available_homes), disbands_status) + + self._bytes += bytes(tokens.MIS) + add_parentheses(bytes(Token(from_int=disbands_status))) + +class OrderResultResponse(DaideResponse): + """ Represents a ORD DAIDE response. Sends the result of an order after the turn has been processed. + Syntax: + ORD (turn) (order) (result) + ORD (turn) (order) (result RET) + Result syntax: + SUC # Order succeeded (can apply to any order). + BNC # Move bounced (only for MTO, CTO or RTO orders). + CUT # Support cut (only for SUP orders). + DSR # Move via convoy failed due to dislodged convoying fleet (only for CTO orders). + NSO # No such order (only for SUP, CVY or CTO orders). + RET # Unit was dislodged and must retreat. + """ + def __init__(self, phase_name, order_bytes, results, **kwargs): + """ Builds the response + :param phase_name: The name of the current phase (e.g. 'S1901M') + :param order_bytes: The bytes received for the order + :param results: An array containing the error codes. + """ + super(OrderResultResponse, self).__init__(**kwargs) + turn_clause = parse_string(Turn, phase_name) + if not results or 0 in results: # Order success response + result_clause = tokens.SUC + else: # Generic order failure response + result_clause = tokens.NSO + + self._bytes = bytes(tokens.ORD) + bytes(turn_clause) + add_parentheses(order_bytes) + \ + add_parentheses(bytes(result_clause)) + +class TimeToDeadlineResponse(DaideResponse): + """ Represents a TME DAIDE response. Sends the time to the next deadline. + Syntax: + TME (seconds) + """ + def __init__(self, seconds, **kwargs): + """ Builds the response + :param seconds: Integer. The number of seconds before deadline + """ + super(TimeToDeadlineResponse, self).__init__(**kwargs) + self._bytes = bytes(tokens.TME) + add_parentheses(bytes(Token(from_int=seconds))) + +class AcceptResponse(DaideResponse): + """ Represents a YES DAIDE request. + Syntax: + YES (TME (seconds)) # Accepts to set the time when a + TME message will be sent + YES (NOT (TME)) # Accepts to cancel all requested time messages + YES (NOT (TME (seconds))) # Accepts to cancel a specific requested time message + YES (GOF) # Accepts to wait until the deadline before processing + the orders for the turn + YES (NOT (GOF)) # Accepts to cancel to wait until the deadline before + processing the orders for the turn + YES (DRW) # Accepts to draw + YES (NOT (DRW)) # Accepts to cancel a draw request + LVL 10: + YES (DRW (power power ...)) # Accepts a partial draw + YES (NOT (DRW (power power ...))) # Accepts to cancel a partial draw request + (? not mentinned in the DAIDE doc) + YES (SND (power power ...) (press_message)) # Accepts a press message + YES (SND (turn) (power power ...) (press_message)) # Accepts a press message + """ + def __init__(self, request_bytes, **kwargs): + """ Builds the response + :param request_bytes: The bytes received for the request + """ + super(AcceptResponse, self).__init__(**kwargs) + self._bytes = bytes(tokens.YES) + add_parentheses(request_bytes) + +class RejectResponse(DaideResponse): + """ Represents a REJ DAIDE request. + Syntax: + REJ (NME ('name') ('version')) # Rejects a client in the game + REJ (IAM (power) (passcode)) # Rejects a client to rejoin the game + REJ (HLO) # Rejects to send the HLO message + REJ (HST (turn)) # Rejects to send a copy of a previous + ORD, SCO and NOW messages + REJ (SUB (order) (order)) # Rejects a submition of orders + REJ (SUB (turn) (order) (order)) # Rejects a submition of orders + REJ (NOT (SUB (order))) # Rejects a cancellation of a submitted order + REJ (MIS) # Rejects to send a copy of the current MIS message + REJ (GOF) # Rejects to wait until the deadline before processing + the orders for the turn + REJ (NOT (GOF)) # Rejects to cancel to wait until the deadline before + processing the orders for the turn + REJ (TME (seconds)) # Rejects to set the time when a + TME message will be sent + REJ (NOT (TME)) # Rejects to cancel all requested time messages + REJ (NOT (TME (seconds))) # Rejects to cancel a specific requested time message + REJ (ADM ('name') ('message') # Rejects the admin message + REJ (DRW) # Rejects to draw + REJ (NOT (DRW)) # Rejects to cancel a draw request + LVL 10: + REJ (DRW (power power ...)) # Rejects to partially draw + REJ (NOT (DRW (power power ...))) # Rejects to cancel a partial draw request + REJ (SND (power power ...) (press_message)) # Rejects a press message + REJ (SND (turn) (power power ...) (press_message)) # Rejects a press message + """ + def __init__(self, request_bytes, **kwargs): + """ Builds the response + :param request_bytes: The bytes received for the request + """ + super(RejectResponse, self).__init__(**kwargs) + self._bytes = bytes(tokens.REJ) + add_parentheses(request_bytes) + +class NotResponse(DaideResponse): + """ Represents a NOT DAIDE response. + Syntax: + NOT (CCD (power)) + """ + def __init__(self, response_bytes, **kwargs): + """ Builds the response + :param response_bytes: The bytes received for the request + """ + super(NotResponse, self).__init__(**kwargs) + self._bytes = bytes(tokens.NOT) + add_parentheses(response_bytes) + +class PowerInCivilDisorderResponse(DaideResponse): + """ Represents a CCD DAIDE response. Sends the name of the power in civil disorder. + Syntax: + CCD (power) + """ + def __init__(self, power_name, **kwargs): + """ Builds the response + :param power_name: The name of the power being played. + """ + super(PowerInCivilDisorderResponse, self).__init__(**kwargs) + power = parse_string(Power, power_name) + self._bytes = bytes(tokens.CCD) + add_parentheses(bytes(power)) + +class PowerIsEliminatedResponse(DaideResponse): + """ Represents a OUT DAIDE response. Sends the name of the power eliminated. + Syntax: + OUT (power) + """ + def __init__(self, power_name, **kwargs): + """ Builds the response + :param power_name: The name of the power being played. + """ + super(PowerIsEliminatedResponse, self).__init__(**kwargs) + power = parse_string(Power, power_name) + self._bytes = bytes(tokens.OUT) + add_parentheses(bytes(power)) + +class ParenthesisErrorResponse(DaideResponse): + """ Represents a PRN DAIDE response. + Syntax: + PRN (message) + """ + def __init__(self, request_bytes, **kwargs): + """ Builds the response + :param request_bytes: The bytes received for the request + """ + super(ParenthesisErrorResponse, self).__init__(**kwargs) + self._bytes = bytes(tokens.PRN) + add_parentheses(request_bytes) + +class SyntaxErrorResponse(DaideResponse): + """ Represents a HUH DAIDE response. + Syntax: + HUH (message) + """ + def __init__(self, request_bytes, error_index, **kwargs): + """ Builds the response + :param request_bytes: The bytes received for the request + :param error_index: The index of the faulty token + """ + super(SyntaxErrorResponse, self).__init__(**kwargs) + message_with_err = request_bytes[:error_index] + bytes(tokens.ERR) + request_bytes[error_index:] + self._bytes = bytes(tokens.HUH) + add_parentheses(message_with_err) + +class TurnOffResponse(DaideResponse): + """ Represents an OFF DAIDE response. Requests a client to exit + Syntax: + OFF + """ + def __init__(self, **kwargs): + """ Builds the response + """ + super(TurnOffResponse, self).__init__(**kwargs) + self._bytes = bytes(tokens.OFF) + +MAP = MapNameResponse +MDF = MapDefinitionResponse +HLO = HelloResponse +SCO = SupplyCenterResponse +NOW = CurrentPositionResponse +THX = ThanksResponse +MIS = MissingOrdersResponse +ORD = OrderResultResponse +TME = TimeToDeadlineResponse +YES = AcceptResponse +REJ = RejectResponse +NOT = NotResponse +CCD = PowerInCivilDisorderResponse +OUT = PowerIsEliminatedResponse +OFF = TurnOffResponse +PRN = ParenthesisErrorResponse +HUH = SyntaxErrorResponse diff --git a/diplomacy/daide/tests/test_responses.py b/diplomacy/daide/tests/test_responses.py new file mode 100644 index 0000000..407b96c --- /dev/null +++ b/diplomacy/daide/tests/test_responses.py @@ -0,0 +1,339 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +""" Tests for request objects """ +from diplomacy import Game +from diplomacy.daide import responses +from diplomacy.daide.utils import str_to_bytes +import diplomacy.utils.errors as err +from diplomacy.utils.order_results import OK, BOUNCE, DISLODGED + +def test_map(): + """ Tests the MAP response """ + daide_str = 'MAP ( s t a n d a r d )' + response = responses.MAP('standard') + assert isinstance(response, responses.MAP), 'Expected a MAP response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_hlo(): + """ Tests the HLO response """ + daide_str = 'HLO ( FRA ) ( #1234 ) ( ( LVL #0 ) ( MTL #1200 ) ( RTL #1200 ) ( BTL #1200 ) ( AOA ) )' + response = responses.HLO('FRANCE', 1234, 0, 1200, ['NO_CHECK']) + assert isinstance(response, responses.HLO), 'Expected a HLO response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_hlo_no_deadline(): + """ Tests the HLO response """ + daide_str = 'HLO ( FRA ) ( #1234 ) ( ( LVL #0 ) ( AOA ) )' + response = responses.HLO('FRANCE', 1234, 0, 0, ['NO_CHECK']) + assert isinstance(response, responses.HLO), 'Expected a HLO response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_sco(): + """ Tests the SCO response """ + daide_str = 'SCO ( AUS BUD TRI VIE ) ( ENG EDI LON LVP ) ( FRA BRE MAR PAR ) ' \ + '( GER BER KIE MUN ) ( ITA NAP ROM VEN ) ( RUS MOS SEV STP WAR ) ' \ + '( TUR ANK CON SMY ) ( UNO BEL BUL DEN GRE HOL NWY POR RUM SER SPA SWE TUN )' + game = Game(map_name='standard') + power_centers = {power.name: power.centers for power in game.powers.values()} + response = responses.SCO(power_centers, map_name='standard') + assert isinstance(response, responses.SCO), 'Expected a SCO response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_now(): + """ Tests the NOW response """ + daide_str = 'NOW ( SPR #1901 ) ( AUS AMY BUD ) ( AUS AMY VIE ) ( AUS FLT TRI ) ( ENG FLT EDI )' \ + ' ( ENG FLT LON ) ( ENG AMY LVP ) ( FRA FLT BRE ) ( FRA AMY MAR ) ( FRA AMY PAR )' \ + ' ( GER FLT KIE ) ( GER AMY BER ) ( GER AMY MUN ) ( ITA FLT NAP ) ( ITA AMY ROM )' \ + ' ( ITA AMY VEN ) ( RUS AMY WAR ) ( RUS AMY MOS ) ( RUS FLT SEV )' \ + ' ( RUS FLT ( STP SCS ) ) ( TUR FLT ANK ) ( TUR AMY CON ) ( TUR AMY SMY )' + game = Game(map_name='standard') + phase_name = game.get_current_phase() + units = {power.name: power.units for power in game.powers.values()} + retreats = {power.name: power.retreats for power in game.powers.values()} + response = responses.NOW(phase_name=phase_name, powers_units=units, powers_retreats=retreats) + assert isinstance(response, responses.NOW), 'Expected a NOW response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_thx_001(): + """ Tests the THX response """ + daide_str = 'THX ( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY ) ( MBV )' + order_daide_str = '( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY )' + response = responses.THX(order_bytes=str_to_bytes(order_daide_str), results=[]) + assert isinstance(response, responses.THX), 'Expected a THX response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_thx_002(): + """ Tests the THX response """ + daide_str = 'THX ( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY ) ( NYU )' + order_daide_str = '( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY )' + response = responses.THX(order_bytes=str_to_bytes(order_daide_str), + results=[error.code for error in [err.GAME_ORDER_TO_FOREIGN_UNIT % 'A MAR']]) + assert isinstance(response, responses.THX), 'Expected a THX response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_thx_003(): + """ Tests the THX response """ + daide_str = 'THX ( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY ) ( MBV )' + order_daide_str = '( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY )' + response = responses.THX(order_bytes=str_to_bytes(order_daide_str), + results=[error.code for error in [OK, err.MAP_LEAST_TWO_POWERS]]) + assert isinstance(response, responses.THX), 'Expected a THX response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_001(): + """ Tests the MIS response """ + daide_str = 'MIS ( FRA FLT BRE ) ( FRA AMY MAR ) ( FRA AMY PAR )' + game = Game(map_name='standard') + phase_name = 'S1901M' + power = game.get_power('FRANCE') + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_002(): + """ Tests the MIS response """ + daide_str = 'MIS ( TUR FLT ANK MRT ( ARM ) ) ' \ + '( TUR FLT CON MRT ( BLA SMY ( BUL ECS ) ( BUL SCS ) ) ) ' \ + '( TUR AMY SMY MRT ( SYR ) )' + game = Game(map_name='standard') + phase_name = 'S1901R' + power = game.get_power('TURKEY') + power.units = ['F ANK', 'F CON', 'A SMY'] + power.retreats['F ANK'] = ['ARM'] + power.retreats['F CON'] = ['BLA', 'SMY', 'BUL/EC', 'BUL/SC'] + power.retreats['A SMY'] = ['SYR'] + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_003(): + """ Tests the MIS response """ + daide_str = 'MIS ( #0 )' + game = Game(map_name='standard') + phase_name = 'W1901A' + power = game.get_power('FRANCE') + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_004(): + """ Tests the MIS response """ + daide_str = 'MIS ( #1 )' + game = Game(map_name='standard') + phase_name = 'W1901A' + power = game.get_power('FRANCE') + power.centers = power.centers[:-1] + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_005(): + """ Tests the MIS response """ + daide_str = 'MIS ( #-1 )' + game = Game(map_name='standard') + phase_name = 'W1901A' + power = game.get_power('FRANCE') + power.units = power.units[:-1] + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_006(): + """ Tests the MIS response """ + daide_str = 'MIS ( #1 )' + game = Game(map_name='standard') + phase_name = 'W1901A' + power = game.get_power('FRANCE') + power.units = power.units + ['F LON'] + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_007(): + """ Tests the MIS response """ + daide_str = 'MIS ( FRA FLT BRE ) ( FRA AMY MAR )' + game = Game(map_name='standard') + game.set_orders('FRANCE', ['A PAR - BUR']) + phase_name = 'S1901M' + power = game.get_power('FRANCE') + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_008(): + """ Tests the MIS response """ + daide_str = 'MIS ( FRA FLT BRE ) ( FRA AMY MAR )' + game = Game(map_name='standard') + game.add_rule('NO_CHECK') + game.set_orders('FRANCE', ['A PAR - BUR']) + phase_name = 'S1901M' + power = game.get_power('FRANCE') + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_009(): + """ Tests the MIS response """ + daide_str = 'MIS ( FRA FLT BRE ) ( FRA AMY MAR )' + game = Game(map_name='standard') + phase_name = 'S1901M' + power = game.get_power('FRANCE') + power.orders['REORDER 1'] = 'A PAR - BUR' + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_010(): + """ Tests the MIS response """ + daide_str = 'MIS ( FRA FLT BRE ) ( FRA AMY MAR )' + game = Game(map_name='standard') + phase_name = 'S1901M' + power = game.get_power('FRANCE') + power.orders['INVALID'] = 'A PAR - BUR' + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_011(): + """ Tests the MIS response """ + daide_str = 'MIS ( #0 )' + game = Game(map_name='standard') + phase_name = 'W1901A' + power = game.get_power('FRANCE') + power.centers += ['LON'] + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_mis_012(): + """ Tests the MIS response """ + daide_str = 'MIS ( #-1 )' + game = Game(map_name='standard') + phase_name = 'W1901A' + power = game.get_power('FRANCE') + power.centers += ['LON'] + power.units = power.units[:2] + response = responses.MIS(phase_name=phase_name, power=power) + assert isinstance(response, responses.MIS), 'Expected a MIS response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_ord_001(): + """ Tests the ORD response """ + daide_str = 'ORD ( SPR #1901 ) ( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY ) ( SUC )' + order_daide_str = '( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY' + game = Game(map_name='standard') + phase_name = game.map.phase_abbr(game.phase) + response = responses.ORD(phase_name=phase_name, order_bytes=str_to_bytes(order_daide_str), results=[]) + assert isinstance(response, responses.ORD), 'Expected a ORD response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_ord_002(): + """ Tests the ORD response """ + daide_str = 'ORD ( SPR #1901 ) ( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY ) ( NSO )' + order_daide_str = '( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY' + game = Game(map_name='standard') + phase_name = game.map.phase_abbr(game.phase) + response = responses.ORD(phase_name=phase_name, order_bytes=str_to_bytes(order_daide_str), + results=[BOUNCE.code]) + assert isinstance(response, responses.ORD), 'Expected a ORD response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_ord_003(): + """ Tests the ORD response """ + daide_str = 'ORD ( SPR #1901 ) ( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY ) ( NSO )' + order_daide_str = '( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY' + game = Game(map_name='standard') + phase_name = game.map.phase_abbr(game.phase) + response = responses.ORD(phase_name=phase_name, order_bytes=str_to_bytes(order_daide_str), + results=[DISLODGED]) + assert isinstance(response, responses.ORD), 'Expected a ORD response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_ord_004(): + """ Tests the ORD response """ + daide_str = 'ORD ( SPR #1901 ) ( ( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY ) ( NSO )' + order_daide_str = '( ENG FLT NWG ) SUP ( ENG AMY YOR ) MTO NWY' + game = Game(map_name='standard') + phase_name = game.map.phase_abbr(game.phase) + response = responses.ORD(phase_name=phase_name, order_bytes=str_to_bytes(order_daide_str), + results=[BOUNCE.code, DISLODGED]) + assert isinstance(response, responses.ORD), 'Expected a ORD response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_tme(): + """ Tests the TME response """ + daide_str = 'TME ( #60 )' + response = responses.TME(seconds=60) + assert isinstance(response, responses.TME), 'Expected a TME response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_yes(): + """ Tests the YES response """ + daide_str = 'YES ( TME ( #60 ) )' + request_daide_str = 'TME ( #60 )' + response = responses.YES(request_bytes=str_to_bytes(request_daide_str)) + assert isinstance(response, responses.YES), 'Expected a YES response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_rej(): + """ Tests the REJ response """ + daide_str = 'REJ ( TME ( #60 ) )' + request_daide_str = 'TME ( #60 )' + response = responses.REJ(request_bytes=str_to_bytes(request_daide_str)) + assert isinstance(response, responses.REJ), 'Expected a REJ response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_not(): + """ Tests the NOT response """ + daide_str = 'NOT ( CCD ( FRA ) )' + response_daide_str = 'CCD ( FRA )' + response = responses.NOT(response_bytes=str_to_bytes(response_daide_str)) + assert isinstance(response, responses.NOT), 'Expected a NOT response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_ccd(): + """ Tests the CCD response """ + daide_str = 'CCD ( AUS )' + response = responses.CCD(power_name='AUSTRIA') + assert isinstance(response, responses.CCD), 'Expected a CCD response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_out(): + """ Tests the OUT response """ + daide_str = 'OUT ( AUS )' + response = responses.OUT(power_name='AUSTRIA') + assert isinstance(response, responses.OUT), 'Expected a OUT response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_prn(): + """ Tests the PRN response """ + daide_str = 'PRN ( SUB ( ( ENG AMY LVP ) HLD ) ' \ + '( ( ENG FLT LON ) MTO NTH ) ' \ + '( ( ENG FLT EDI ) SUP ( ENG FLT LON ) MTO NTH )' + request_daide_str = 'SUB ( ( ENG AMY LVP ) HLD ) ' \ + '( ( ENG FLT LON ) MTO NTH ) ' \ + '( ( ENG FLT EDI ) SUP ( ENG FLT LON ) MTO NTH' + response = responses.PRN(request_bytes=str_to_bytes(request_daide_str)) + assert isinstance(response, responses.PRN), 'Expected a PRN response' + assert bytes(response) == str_to_bytes(daide_str) + +def test_huh(): + """ Tests the HUH response """ + daide_str = 'HUH ( ERR )' + response = responses.HUH(b'', 0) + assert isinstance(response, responses.HUH), 'Expected a HUH response' + assert bytes(response) == str_to_bytes(daide_str) |