From 08b9469e2c71e06fdd70d607f281686746755073 Mon Sep 17 00:00:00 2001 From: Satya Ortiz-Gagne Date: Mon, 10 Jun 2019 10:20:39 -0400 Subject: DAIDE - Added connection_handler and server - Ability to open and close port when DAIDE games are started and stopped - Can get the DAIDE port using a request --- diplomacy/server/connection_handler.py | 16 ++++++++ diplomacy/server/notifier.py | 5 ++- diplomacy/server/request_managers.py | 18 +++++++++ diplomacy/server/server.py | 68 ++++++++++++++++++++++++++++++++-- 4 files changed, 103 insertions(+), 4 deletions(-) (limited to 'diplomacy/server') diff --git a/diplomacy/server/connection_handler.py b/diplomacy/server/connection_handler.py index 0089db3..6c1351f 100644 --- a/diplomacy/server/connection_handler.py +++ b/diplomacy/server/connection_handler.py @@ -26,6 +26,7 @@ import ujson as json from diplomacy.communication import responses, requests from diplomacy.server import request_managers from diplomacy.utils import exceptions, strings +from diplomacy.utils.network_data import NetworkData LOGGER = logging.getLogger(__name__) @@ -79,6 +80,21 @@ class ConnectionHandler(WebSocketHandler): self.server.users.remove_connection(self, remove_tokens=False) LOGGER.info("Removed connection. Remaining %d connection(s).", self.server.users.count_connections()) + def write_message(self, message, binary=False): + """ Sends the given message to the client of this Web Socket. """ + if isinstance(message, NetworkData): + message = message.json() + return super(ConnectionHandler, self).write_message(message, binary) + + @staticmethod + def translate_notification(notification): + """ Translate a notification to an array of notifications. + :param notification: a notification object to pass to handler function. + See diplomacy.communication.notifications for possible notifications. + :return: An array of notifications containing a single notification. + """ + return [notification] + @gen.coroutine def on_message(self, message): """ Parse given message and manage parsed data (expected a string representation of a request). """ diff --git a/diplomacy/server/notifier.py b/diplomacy/server/notifier.py index 4bd64d3..2f8761c 100644 --- a/diplomacy/server/notifier.py +++ b/diplomacy/server/notifier.py @@ -75,7 +75,10 @@ class Notifier(): """ connection_handler = self.server.users.get_connection_handler(notification.token) if not self.ignores(notification) and connection_handler: - yield self.server.notifications.put((connection_handler, notification)) + translated_notifications = connection_handler.translate_notification(notification) + if translated_notifications: + for translated_notification in translated_notifications: + yield self.server.notifications.put((connection_handler, translated_notification)) @gen.coroutine def _notify_game(self, server_game, notification_class, **kwargs): diff --git a/diplomacy/server/request_managers.py b/diplomacy/server/request_managers.py index d17a77b..6819421 100644 --- a/diplomacy/server/request_managers.py +++ b/diplomacy/server/request_managers.py @@ -149,6 +149,7 @@ def on_create_game(server, request, connection_handler): # Register game on server. server.add_new_game(server_game) + server.start_new_daide_server(game_id) # Start game immediately if possible (e.g. if it's a solitaire game). if server_game.game_can_start(): @@ -216,6 +217,7 @@ def on_delete_game(server, request, connection_handler): level = verify_request(server, request, connection_handler, observer_role=False, power_role=False) server.delete_game(level.game) server.unschedule_game(level.game) + server.stop_daide_server(level.game.game_id) Notifier(server, ignore_tokens=[request.token]).notify_game_deleted(level.game) def on_get_dummy_waiting_powers(server, request, connection_handler): @@ -256,6 +258,21 @@ def on_get_available_maps(server, request, connection_handler): verify_request(server, request, connection_handler) return responses.DataMaps(data=server.available_maps, request_id=request.request_id) +def on_get_daide_port(server, request, connection_handler): + """ Manage request GetDaidePort. + :param server: server which receives the request. + :param request: request to manage. + :param connection_handler: connection handler from which the request was sent. + :return: None + :type server: diplomacy.Server + :type request: diplomacy.communication.requests.GetDaidePort + """ + del connection_handler + daide_port = server.get_daide_port(request.game_id) + if daide_port is None: + raise exceptions.DaidePortException('Invalid game id or game\'s DAIDE server is not started for that game') + return responses.DataPort(data=daide_port, request_id=request.request_id) + def on_get_playable_powers(server, request, connection_handler): """ Manage request GetPlayablePowers. :param server: server which receives the request. @@ -1133,6 +1150,7 @@ MAPPING = { requests.GetDummyWaitingPowers: on_get_dummy_waiting_powers, requests.GetAllPossibleOrders: on_get_all_possible_orders, requests.GetAvailableMaps: on_get_available_maps, + requests.GetDaidePort: on_get_daide_port, requests.GetPlayablePowers: on_get_playable_powers, requests.GetPhaseHistory: on_get_phase_history, requests.JoinGame: on_join_game, diff --git a/diplomacy/server/server.py b/diplomacy/server/server.py index fed4eb6..d1f044d 100644 --- a/diplomacy/server/server.py +++ b/diplomacy/server/server.py @@ -50,12 +50,15 @@ import atexit import base64 import logging import os +from random import randint +import socket import signal import tornado import tornado.web from tornado import gen from tornado.ioloop import IOLoop +from tornado.iostream import StreamClosedError from tornado.queues import Queue from tornado.websocket import WebSocketClosedError @@ -63,6 +66,7 @@ import ujson as json import diplomacy.settings from diplomacy.communication import notifications +from diplomacy.daide.server import Server as DaideServer from diplomacy.server.connection_handler import ConnectionHandler from diplomacy.server.notifier import Notifier from diplomacy.server.scheduler import Scheduler @@ -73,6 +77,16 @@ from diplomacy.utils import common, exceptions, strings, constants LOGGER = logging.getLogger(__name__) +def is_port_opened(port, hostname='127.0.0.1'): + """ Checks if the specified port is opened + :param port: The port to check + :param hostname: The hostname to check, defaults to '127.0.0.1' + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if sock.connect_ex((hostname, port)) == 0: + return True + return False + def get_absolute_path(directory=None): """ Return absolute path of given directory. If given directory is None, return absolute path of current directory. @@ -136,6 +150,7 @@ class InterruptionHandler(): :param frame: frame received """ if signum == signal.SIGINT: + self.server.stop_daide_server(None) self.server.backup_now(force=True) if self.previous_handler: self.previous_handler(signum, frame) @@ -162,7 +177,7 @@ class Server(): """ Server class. """ __slots__ = ['data_path', 'games_path', 'available_maps', 'maps_mtime', 'notifications', 'games_scheduler', 'allow_registrations', 'max_games', 'remove_canceled_games', 'users', 'games', - 'backup_server', 'backup_games', 'backup_delay_seconds', 'ping_seconds', + 'daide_servers', 'backup_server', 'backup_games', 'backup_delay_seconds', 'ping_seconds', 'interruption_handler', 'backend', 'games_with_dummy_powers', 'dispatched_dummy_powers'] # Servers cache. @@ -228,6 +243,9 @@ class Server(): # If there is no bot token associated, couple is (None, None). self.dispatched_dummy_powers = {} # type: dict{str, tuple} + # DAIDE TCP servers listening to a game's dedicated port. + self.daide_servers = {} # {port: daide_server} + # Load data on memory. self._load() @@ -405,9 +423,11 @@ class Server(): while True: connection_handler, notification = yield self.notifications.get() try: - yield connection_handler.write_message(notification.json()) + yield connection_handler.write_message(notification) except WebSocketClosedError: LOGGER.error('Websocket was closed while sending a notification.') + except StreamClosedError: + LOGGER.error('Stream was closed while sending a notification.') finally: self.notifications.task_done() @@ -452,7 +472,8 @@ class Server(): self.backend.port = port self.set_tasks(io_loop) LOGGER.info('Running on port %d', self.backend.port) - io_loop.start() + if not io_loop.asyncio_loop.is_running(): + io_loop.start() def get_game_indices(self): """ Iterate over all game indices in server database. @@ -795,3 +816,44 @@ class Server(): def get_map(self, map_name): """ Return map power names for given map name. """ return self.available_maps.get(map_name, None) + + def start_new_daide_server(self, game_id, port=None): + """ Start a new DAIDE TCP server to handle DAIDE clients connections + :param game_id: game id to pass to the DAIDE server + :param port: the port to use. If None, an available random prot will be used + """ + if port in self.daide_servers: + raise RuntimeError('Port already in used by a DAIDE server') + + for server in self.daide_servers.values(): + if server.game_id == game_id: + return None + + while port is None or is_port_opened(port): + port = randint(8000, 8999) + + # Create DAIDE TCP server + daide_server = DaideServer(self, game_id) + daide_server.listen(port) + self.daide_servers[port] = daide_server + LOGGER.info('DAIDE server running on port %d', port) + return port + + def stop_daide_server(self, game_id): + """ Stop one or all DAIDE TCP server + :param game_id: game id of the DAIDE server. If None, all servers will be stopped + """ + for port in list(self.daide_servers.keys()): + server = self.daide_servers[port] + if game_id is None or server.game_id == game_id: + server.stop() + del self.daide_servers[port] + + def get_daide_port(self, game_id): + """ Get the DAIDE port opened for a specific game_id + :param game_id: game id of the DAIDE server. + """ + for port, server in self.daide_servers.items(): + if server.game_id == game_id: + return port + return None -- cgit v1.2.3