diff options
-rw-r--r-- | diplomacy/client/connection.py | 8 | ||||
-rw-r--r-- | diplomacy/client/response_managers.py | 1 | ||||
-rw-r--r-- | diplomacy/communication/requests.py | 11 | ||||
-rw-r--r-- | diplomacy/communication/responses.py | 7 | ||||
-rw-r--r-- | diplomacy/daide/connection_handler.py | 200 | ||||
-rw-r--r-- | diplomacy/daide/server.py | 73 | ||||
-rw-r--r-- | diplomacy/server/connection_handler.py | 16 | ||||
-rw-r--r-- | diplomacy/server/notifier.py | 5 | ||||
-rw-r--r-- | diplomacy/server/request_managers.py | 18 | ||||
-rw-r--r-- | diplomacy/server/server.py | 68 | ||||
-rw-r--r-- | diplomacy/utils/exceptions.py | 3 |
11 files changed, 406 insertions, 4 deletions
diff --git a/diplomacy/client/connection.py b/diplomacy/client/connection.py index f9eb628..b2e6cbc 100644 --- a/diplomacy/client/connection.py +++ b/diplomacy/client/connection.py @@ -437,6 +437,14 @@ class Connection(): self._on_socket_message(msg) # Public methods. + @gen.coroutine + def get_daide_port(self, game_id): + """ Send a GetDaidePort request. + :param game_id: game id + :return: int. the game DAIDE port + """ + request = requests.GetDaidePort(game_id=game_id) + return (yield self.send(request)) @gen.coroutine def authenticate(self, username, password, create_user=False): diff --git a/diplomacy/client/response_managers.py b/diplomacy/client/response_managers.py index 5183ac4..991586f 100644 --- a/diplomacy/client/response_managers.py +++ b/diplomacy/client/response_managers.py @@ -298,6 +298,7 @@ MAPPING = { requests.DeleteGame: on_delete_game, requests.GetAllPossibleOrders: default_manager, requests.GetAvailableMaps: default_manager, + requests.GetDaidePort: default_manager, requests.GetDummyWaitingPowers: default_manager, requests.GetPlayablePowers: default_manager, requests.GetPhaseHistory: on_get_phase_history, diff --git a/diplomacy/communication/requests.py b/diplomacy/communication/requests.py index b7f7671..a6bb622 100644 --- a/diplomacy/communication/requests.py +++ b/diplomacy/communication/requests.py @@ -114,6 +114,17 @@ class _AbstractGameRequest(_AbstractChannelRequest): # Connection requests. # ==================== +class GetDaidePort(_AbstractRequest): + """ Get game DAIDE port """ + __slots__ = ['game_id'] + params = { + strings.GAME_ID: str + } + + def __init__(self, **kwargs): + self.game_id = None + super(GetDaidePort, self).__init__(**kwargs) + class SignIn(_AbstractRequest): """ SignIn request. Expected response: responses.DataToken diff --git a/diplomacy/communication/responses.py b/diplomacy/communication/responses.py index a928720..58edd6f 100644 --- a/diplomacy/communication/responses.py +++ b/diplomacy/communication/responses.py @@ -161,6 +161,13 @@ class DataGames(UniqueData): strings.DATA: parsing.SequenceType(parsing.JsonableClassType(DataGameInfo)) # list of game info. } +class DataPort(UniqueData): + """ Unique data containing a DAIDE port. """ + __slots__ = [] + params = { + strings.DATA: int # DAIDE port + } + class DataTimeStamp(UniqueData): """ Unique data containing a timestamp. """ __slots__ = [] diff --git a/diplomacy/daide/connection_handler.py b/diplomacy/daide/connection_handler.py new file mode 100644 index 0000000..0b606bf --- /dev/null +++ b/diplomacy/daide/connection_handler.py @@ -0,0 +1,200 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +""" Tornado stream wrapper, used internally to abstract a DAIDE stream connection from a WebSocketConnection. """ +import logging +from tornado import gen +from tornado.concurrent import Future +from tornado.iostream import StreamClosedError +from diplomacy.daide import notifications, request_managers, responses +from diplomacy.daide.messages import DiplomacyMessage, DaideMessage, ErrorMessage, RepresentationMessage, MessageType +from diplomacy.daide.notification_managers import translate_notification +from diplomacy.daide.requests import RequestBuilder +from diplomacy.daide.utils import bytes_to_str +from diplomacy.utils import exceptions + +# Constants +LOGGER = logging.getLogger(__name__) + +class ConnectionHandler(): + """ ConnectionHandler class. Properties: + - server: server object representing running server. + """ + _NAME_VARIANT_PREFIX = 'DAIDE' + _NAME_VARIANTS_POOL = [] + _USED_NAME_VARIANTS = [] + + def __init__(self): + self.stream = None + self.server = None + self.game_id = None + self.token = None + self._name_variant = None + self._socket_no = None + self._local_addr = ('::1', 0, 0, 0) + self._remote_addr = ('::1', 0, 0, 0) + + self.message_mapping = {MessageType.INITIAL: self._on_initial_message, + MessageType.DIPLOMACY: self._on_diplomacy_message, + MessageType.FINAL: self._on_final_message, + MessageType.ERROR: self._on_error_message} + + def initialize(self, stream, server, game_id): + """ Initialize the connection handler. + :param server: a Server object. + :type server: diplomacy.Server + """ + self.stream = stream + self.server = server + self.game_id = game_id + stream.set_close_callback(self.on_connection_close) + self._socket_no = self.stream.socket.fileno() + self._local_addr = stream.socket.getsockname() + self._remote_addr = stream.socket.getpeername() + + @property + def local_addr(self): + """ Return the address of the local endpoint """ + return self._local_addr + + @property + def remote_addr(self): + """ Return the address of the remote endpoint """ + return self._remote_addr + + def get_name_variant(self): + """ Return the address of the remote endpoint """ + if self._name_variant is None: + self._name_variant = self._NAME_VARIANTS_POOL.pop(0) if self._NAME_VARIANTS_POOL \ + else len(self._USED_NAME_VARIANTS) + self._USED_NAME_VARIANTS.append(self._name_variant) + return self._NAME_VARIANT_PREFIX + str(self._name_variant) + + def release_name_variant(self): + """ Return the next available user name variant """ + self._USED_NAME_VARIANTS.remove(self._name_variant) + self._NAME_VARIANTS_POOL.append(self._name_variant) + self._name_variant = None + + @gen.coroutine + def close_connection(self): + """ Close the connection with the client """ + try: + message = DiplomacyMessage() + message.content = bytes(responses.TurnOffResponse()) + yield self.write_message(message) + self.stream.close() + except StreamClosedError: + LOGGER.error('Stream is closed.') + + def on_connection_close(self): + """ Invoked when the socket is closed (see parent method). + Detach this connection handler from server users. + """ + self.release_name_variant() + self.server.users.remove_connection(self, remove_tokens=False) + LOGGER.info('Removed connection. Remaining %d connection(s).', self.server.users.count_connections()) + + @gen.coroutine + def read_stream(self): + """ Read the next message from the stream """ + messages = [] + in_message = yield DaideMessage.from_stream(self.stream) + + if in_message and in_message.is_valid: + message_handler = self.message_mapping.get(in_message.message_type, None) + if not message_handler: + raise RuntimeError('Unrecognized DAIDE message type [{}]'.format(in_message.message_type)) + + if gen.is_coroutine_function(message_handler): + messages = yield message_handler(in_message) + else: + messages = message_handler(in_message) + elif in_message: + err_message = ErrorMessage() + err_message.error_code = in_message.error_code + messages = [err_message] + + for message in messages: + yield self.write_message(message) + + # Added for compatibility with WebSocketHandler interface + def write_message(self, message, binary=True): + """ Write a message into the stream """ + if binary and isinstance(message, bytes): + future = self.stream.write(message) + else: + if isinstance(message, notifications.DaideNotification): + LOGGER.info('[%d] notification:[%s]', self._socket_no, bytes_to_str(bytes(message))) + notification = message + message = DiplomacyMessage() + message.content = bytes(notification) + + if isinstance(message, DaideMessage): + future = self.stream.write(bytes(message)) + else: + future = Future() + future.set_result(None) + return future + + def translate_notification(self, notification): + """ Translate a notification to a DAIDE notification. + :param notification: a notification object to pass to handler function. + See diplomacy.communication.notifications for possible notifications. + :return: either None or an array of daide notifications. + See module diplomacy.daide.notifications for possible daide notifications. + """ + return translate_notification(self.server, notification, self) + + def _on_initial_message(self, _): + """ Handle an initial message """ + LOGGER.info('[%d] initial message', self._socket_no) + return [RepresentationMessage()] + + @gen.coroutine + def _on_diplomacy_message(self, in_message): + """ Handle a diplomacy message """ + messages = [] + request = RequestBuilder.from_bytes(in_message.content) + + try: + LOGGER.info('[%d] request:[%s]', self._socket_no, bytes_to_str(in_message.content)) + request.game_id = self.game_id + message_responses = yield request_managers.handle_request(self.server, request, self) + except exceptions.ResponseException: + message_responses = [responses.REJ(bytes(request))] + + if message_responses: + for response in message_responses: + response_bytes = bytes(response) + LOGGER.info('[%d] response:[%s]', self._socket_no, bytes_to_str(response_bytes) \ + if response_bytes else None) + message = DiplomacyMessage() + message.content = response_bytes + messages.append(message) + + return messages + + def _on_final_message(self, _): + """ Handle a final message """ + LOGGER.info('[%d] final message', self._socket_no) + self.stream.close() + return [] + + def _on_error_message(self, in_message): + """ Handle an error message """ + LOGGER.error('[%d] error [%d]', self._socket_no, in_message.error_code) + return [] diff --git a/diplomacy/daide/server.py b/diplomacy/daide/server.py new file mode 100644 index 0000000..ceca122 --- /dev/null +++ b/diplomacy/daide/server.py @@ -0,0 +1,73 @@ +# ============================================================================== +# Copyright (C) 2019 - Philip Paquette +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see <https://www.gnu.org/licenses/>. +# ============================================================================== +""" Parallel server to receive DAIDE communications """ +import logging +from tornado import gen +from tornado.iostream import StreamClosedError +from tornado.tcpserver import TCPServer +from diplomacy.daide.connection_handler import ConnectionHandler + +# Constants +LOGGER = logging.getLogger(__name__) + +class Server(TCPServer): + """ Represents a server to receive DAIDE communications """ + def __init__(self, master_server, game_id): + """ Contructor + :param master_server: the internal server + :param game_id: the game id for which this server will receive communications + """ + super(Server, self).__init__() + self._master_server = master_server + self._game_id = game_id + self._registered_connections = {} + + @property + def master_server(self): + """ Return the master server """ + return self._master_server + + @property + def game_id(self): + """ Return the game id associated with the server """ + return self._game_id + + def stop(self): + """ Stop the server and close all connections """ + for connection_handler in self._registered_connections.values(): + connection_handler.close_connection() + super(Server, self).stop() + + @gen.coroutine + def handle_stream(self, stream, address): + """ Handle an open stream + :param stream: the stream to handle + :param address: the address of the client + """ + LOGGER.info('Connection from client [%s]', str(address)) + + handler = ConnectionHandler() + handler.initialize(stream, self._master_server, self._game_id) + self._registered_connections[stream] = handler + + try: + while not handler.stream.closed(): + yield handler.read_stream() + except StreamClosedError: + LOGGER.error('[%s] disconnected', str(address)) + + del self._registered_connections[stream] diff --git a/diplomacy/server/connection_handler.py b/diplomacy/server/connection_handler.py index 0089db3..6c1351f 100644 --- a/diplomacy/server/connection_handler.py +++ b/diplomacy/server/connection_handler.py @@ -26,6 +26,7 @@ import ujson as json from diplomacy.communication import responses, requests from diplomacy.server import request_managers from diplomacy.utils import exceptions, strings +from diplomacy.utils.network_data import NetworkData LOGGER = logging.getLogger(__name__) @@ -79,6 +80,21 @@ class ConnectionHandler(WebSocketHandler): self.server.users.remove_connection(self, remove_tokens=False) LOGGER.info("Removed connection. Remaining %d connection(s).", self.server.users.count_connections()) + def write_message(self, message, binary=False): + """ Sends the given message to the client of this Web Socket. """ + if isinstance(message, NetworkData): + message = message.json() + return super(ConnectionHandler, self).write_message(message, binary) + + @staticmethod + def translate_notification(notification): + """ Translate a notification to an array of notifications. + :param notification: a notification object to pass to handler function. + See diplomacy.communication.notifications for possible notifications. + :return: An array of notifications containing a single notification. + """ + return [notification] + @gen.coroutine def on_message(self, message): """ Parse given message and manage parsed data (expected a string representation of a request). """ diff --git a/diplomacy/server/notifier.py b/diplomacy/server/notifier.py index 4bd64d3..2f8761c 100644 --- a/diplomacy/server/notifier.py +++ b/diplomacy/server/notifier.py @@ -75,7 +75,10 @@ class Notifier(): """ connection_handler = self.server.users.get_connection_handler(notification.token) if not self.ignores(notification) and connection_handler: - yield self.server.notifications.put((connection_handler, notification)) + translated_notifications = connection_handler.translate_notification(notification) + if translated_notifications: + for translated_notification in translated_notifications: + yield self.server.notifications.put((connection_handler, translated_notification)) @gen.coroutine def _notify_game(self, server_game, notification_class, **kwargs): diff --git a/diplomacy/server/request_managers.py b/diplomacy/server/request_managers.py index d17a77b..6819421 100644 --- a/diplomacy/server/request_managers.py +++ b/diplomacy/server/request_managers.py @@ -149,6 +149,7 @@ def on_create_game(server, request, connection_handler): # Register game on server. server.add_new_game(server_game) + server.start_new_daide_server(game_id) # Start game immediately if possible (e.g. if it's a solitaire game). if server_game.game_can_start(): @@ -216,6 +217,7 @@ def on_delete_game(server, request, connection_handler): level = verify_request(server, request, connection_handler, observer_role=False, power_role=False) server.delete_game(level.game) server.unschedule_game(level.game) + server.stop_daide_server(level.game.game_id) Notifier(server, ignore_tokens=[request.token]).notify_game_deleted(level.game) def on_get_dummy_waiting_powers(server, request, connection_handler): @@ -256,6 +258,21 @@ def on_get_available_maps(server, request, connection_handler): verify_request(server, request, connection_handler) return responses.DataMaps(data=server.available_maps, request_id=request.request_id) +def on_get_daide_port(server, request, connection_handler): + """ Manage request GetDaidePort. + :param server: server which receives the request. + :param request: request to manage. + :param connection_handler: connection handler from which the request was sent. + :return: None + :type server: diplomacy.Server + :type request: diplomacy.communication.requests.GetDaidePort + """ + del connection_handler + daide_port = server.get_daide_port(request.game_id) + if daide_port is None: + raise exceptions.DaidePortException('Invalid game id or game\'s DAIDE server is not started for that game') + return responses.DataPort(data=daide_port, request_id=request.request_id) + def on_get_playable_powers(server, request, connection_handler): """ Manage request GetPlayablePowers. :param server: server which receives the request. @@ -1133,6 +1150,7 @@ MAPPING = { requests.GetDummyWaitingPowers: on_get_dummy_waiting_powers, requests.GetAllPossibleOrders: on_get_all_possible_orders, requests.GetAvailableMaps: on_get_available_maps, + requests.GetDaidePort: on_get_daide_port, requests.GetPlayablePowers: on_get_playable_powers, requests.GetPhaseHistory: on_get_phase_history, requests.JoinGame: on_join_game, diff --git a/diplomacy/server/server.py b/diplomacy/server/server.py index fed4eb6..d1f044d 100644 --- a/diplomacy/server/server.py +++ b/diplomacy/server/server.py @@ -50,12 +50,15 @@ import atexit import base64 import logging import os +from random import randint +import socket import signal import tornado import tornado.web from tornado import gen from tornado.ioloop import IOLoop +from tornado.iostream import StreamClosedError from tornado.queues import Queue from tornado.websocket import WebSocketClosedError @@ -63,6 +66,7 @@ import ujson as json import diplomacy.settings from diplomacy.communication import notifications +from diplomacy.daide.server import Server as DaideServer from diplomacy.server.connection_handler import ConnectionHandler from diplomacy.server.notifier import Notifier from diplomacy.server.scheduler import Scheduler @@ -73,6 +77,16 @@ from diplomacy.utils import common, exceptions, strings, constants LOGGER = logging.getLogger(__name__) +def is_port_opened(port, hostname='127.0.0.1'): + """ Checks if the specified port is opened + :param port: The port to check + :param hostname: The hostname to check, defaults to '127.0.0.1' + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if sock.connect_ex((hostname, port)) == 0: + return True + return False + def get_absolute_path(directory=None): """ Return absolute path of given directory. If given directory is None, return absolute path of current directory. @@ -136,6 +150,7 @@ class InterruptionHandler(): :param frame: frame received """ if signum == signal.SIGINT: + self.server.stop_daide_server(None) self.server.backup_now(force=True) if self.previous_handler: self.previous_handler(signum, frame) @@ -162,7 +177,7 @@ class Server(): """ Server class. """ __slots__ = ['data_path', 'games_path', 'available_maps', 'maps_mtime', 'notifications', 'games_scheduler', 'allow_registrations', 'max_games', 'remove_canceled_games', 'users', 'games', - 'backup_server', 'backup_games', 'backup_delay_seconds', 'ping_seconds', + 'daide_servers', 'backup_server', 'backup_games', 'backup_delay_seconds', 'ping_seconds', 'interruption_handler', 'backend', 'games_with_dummy_powers', 'dispatched_dummy_powers'] # Servers cache. @@ -228,6 +243,9 @@ class Server(): # If there is no bot token associated, couple is (None, None). self.dispatched_dummy_powers = {} # type: dict{str, tuple} + # DAIDE TCP servers listening to a game's dedicated port. + self.daide_servers = {} # {port: daide_server} + # Load data on memory. self._load() @@ -405,9 +423,11 @@ class Server(): while True: connection_handler, notification = yield self.notifications.get() try: - yield connection_handler.write_message(notification.json()) + yield connection_handler.write_message(notification) except WebSocketClosedError: LOGGER.error('Websocket was closed while sending a notification.') + except StreamClosedError: + LOGGER.error('Stream was closed while sending a notification.') finally: self.notifications.task_done() @@ -452,7 +472,8 @@ class Server(): self.backend.port = port self.set_tasks(io_loop) LOGGER.info('Running on port %d', self.backend.port) - io_loop.start() + if not io_loop.asyncio_loop.is_running(): + io_loop.start() def get_game_indices(self): """ Iterate over all game indices in server database. @@ -795,3 +816,44 @@ class Server(): def get_map(self, map_name): """ Return map power names for given map name. """ return self.available_maps.get(map_name, None) + + def start_new_daide_server(self, game_id, port=None): + """ Start a new DAIDE TCP server to handle DAIDE clients connections + :param game_id: game id to pass to the DAIDE server + :param port: the port to use. If None, an available random prot will be used + """ + if port in self.daide_servers: + raise RuntimeError('Port already in used by a DAIDE server') + + for server in self.daide_servers.values(): + if server.game_id == game_id: + return None + + while port is None or is_port_opened(port): + port = randint(8000, 8999) + + # Create DAIDE TCP server + daide_server = DaideServer(self, game_id) + daide_server.listen(port) + self.daide_servers[port] = daide_server + LOGGER.info('DAIDE server running on port %d', port) + return port + + def stop_daide_server(self, game_id): + """ Stop one or all DAIDE TCP server + :param game_id: game id of the DAIDE server. If None, all servers will be stopped + """ + for port in list(self.daide_servers.keys()): + server = self.daide_servers[port] + if game_id is None or server.game_id == game_id: + server.stop() + del self.daide_servers[port] + + def get_daide_port(self, game_id): + """ Get the DAIDE port opened for a specific game_id + :param game_id: game id of the DAIDE server. + """ + for port, server in self.daide_servers.items(): + if server.game_id == game_id: + return port + return None diff --git a/diplomacy/utils/exceptions.py b/diplomacy/utils/exceptions.py index 5cf4384..138bed9 100644 --- a/diplomacy/utils/exceptions.py +++ b/diplomacy/utils/exceptions.py @@ -86,6 +86,9 @@ class RequestException(ResponseException): class AdminTokenException(ResponseException): """ Invalid token for admin operations. """ +class DaidePortException(ResponseException): + """ Daide server not started for the game """ + class GameCanceledException(ResponseException): """ Game was cancelled. """ |