aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--diplomacy/client/connection.py8
-rw-r--r--diplomacy/client/response_managers.py1
-rw-r--r--diplomacy/communication/requests.py11
-rw-r--r--diplomacy/communication/responses.py7
-rw-r--r--diplomacy/daide/connection_handler.py200
-rw-r--r--diplomacy/daide/server.py73
-rw-r--r--diplomacy/server/connection_handler.py16
-rw-r--r--diplomacy/server/notifier.py5
-rw-r--r--diplomacy/server/request_managers.py18
-rw-r--r--diplomacy/server/server.py68
-rw-r--r--diplomacy/utils/exceptions.py3
11 files changed, 406 insertions, 4 deletions
diff --git a/diplomacy/client/connection.py b/diplomacy/client/connection.py
index f9eb628..b2e6cbc 100644
--- a/diplomacy/client/connection.py
+++ b/diplomacy/client/connection.py
@@ -437,6 +437,14 @@ class Connection():
self._on_socket_message(msg)
# Public methods.
+ @gen.coroutine
+ def get_daide_port(self, game_id):
+ """ Send a GetDaidePort request.
+ :param game_id: game id
+ :return: int. the game DAIDE port
+ """
+ request = requests.GetDaidePort(game_id=game_id)
+ return (yield self.send(request))
@gen.coroutine
def authenticate(self, username, password, create_user=False):
diff --git a/diplomacy/client/response_managers.py b/diplomacy/client/response_managers.py
index 5183ac4..991586f 100644
--- a/diplomacy/client/response_managers.py
+++ b/diplomacy/client/response_managers.py
@@ -298,6 +298,7 @@ MAPPING = {
requests.DeleteGame: on_delete_game,
requests.GetAllPossibleOrders: default_manager,
requests.GetAvailableMaps: default_manager,
+ requests.GetDaidePort: default_manager,
requests.GetDummyWaitingPowers: default_manager,
requests.GetPlayablePowers: default_manager,
requests.GetPhaseHistory: on_get_phase_history,
diff --git a/diplomacy/communication/requests.py b/diplomacy/communication/requests.py
index b7f7671..a6bb622 100644
--- a/diplomacy/communication/requests.py
+++ b/diplomacy/communication/requests.py
@@ -114,6 +114,17 @@ class _AbstractGameRequest(_AbstractChannelRequest):
# Connection requests.
# ====================
+class GetDaidePort(_AbstractRequest):
+ """ Get game DAIDE port """
+ __slots__ = ['game_id']
+ params = {
+ strings.GAME_ID: str
+ }
+
+ def __init__(self, **kwargs):
+ self.game_id = None
+ super(GetDaidePort, self).__init__(**kwargs)
+
class SignIn(_AbstractRequest):
""" SignIn request.
Expected response: responses.DataToken
diff --git a/diplomacy/communication/responses.py b/diplomacy/communication/responses.py
index a928720..58edd6f 100644
--- a/diplomacy/communication/responses.py
+++ b/diplomacy/communication/responses.py
@@ -161,6 +161,13 @@ class DataGames(UniqueData):
strings.DATA: parsing.SequenceType(parsing.JsonableClassType(DataGameInfo)) # list of game info.
}
+class DataPort(UniqueData):
+ """ Unique data containing a DAIDE port. """
+ __slots__ = []
+ params = {
+ strings.DATA: int # DAIDE port
+ }
+
class DataTimeStamp(UniqueData):
""" Unique data containing a timestamp. """
__slots__ = []
diff --git a/diplomacy/daide/connection_handler.py b/diplomacy/daide/connection_handler.py
new file mode 100644
index 0000000..0b606bf
--- /dev/null
+++ b/diplomacy/daide/connection_handler.py
@@ -0,0 +1,200 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+""" Tornado stream wrapper, used internally to abstract a DAIDE stream connection from a WebSocketConnection. """
+import logging
+from tornado import gen
+from tornado.concurrent import Future
+from tornado.iostream import StreamClosedError
+from diplomacy.daide import notifications, request_managers, responses
+from diplomacy.daide.messages import DiplomacyMessage, DaideMessage, ErrorMessage, RepresentationMessage, MessageType
+from diplomacy.daide.notification_managers import translate_notification
+from diplomacy.daide.requests import RequestBuilder
+from diplomacy.daide.utils import bytes_to_str
+from diplomacy.utils import exceptions
+
+# Constants
+LOGGER = logging.getLogger(__name__)
+
+class ConnectionHandler():
+ """ ConnectionHandler class. Properties:
+ - server: server object representing running server.
+ """
+ _NAME_VARIANT_PREFIX = 'DAIDE'
+ _NAME_VARIANTS_POOL = []
+ _USED_NAME_VARIANTS = []
+
+ def __init__(self):
+ self.stream = None
+ self.server = None
+ self.game_id = None
+ self.token = None
+ self._name_variant = None
+ self._socket_no = None
+ self._local_addr = ('::1', 0, 0, 0)
+ self._remote_addr = ('::1', 0, 0, 0)
+
+ self.message_mapping = {MessageType.INITIAL: self._on_initial_message,
+ MessageType.DIPLOMACY: self._on_diplomacy_message,
+ MessageType.FINAL: self._on_final_message,
+ MessageType.ERROR: self._on_error_message}
+
+ def initialize(self, stream, server, game_id):
+ """ Initialize the connection handler.
+ :param server: a Server object.
+ :type server: diplomacy.Server
+ """
+ self.stream = stream
+ self.server = server
+ self.game_id = game_id
+ stream.set_close_callback(self.on_connection_close)
+ self._socket_no = self.stream.socket.fileno()
+ self._local_addr = stream.socket.getsockname()
+ self._remote_addr = stream.socket.getpeername()
+
+ @property
+ def local_addr(self):
+ """ Return the address of the local endpoint """
+ return self._local_addr
+
+ @property
+ def remote_addr(self):
+ """ Return the address of the remote endpoint """
+ return self._remote_addr
+
+ def get_name_variant(self):
+ """ Return the address of the remote endpoint """
+ if self._name_variant is None:
+ self._name_variant = self._NAME_VARIANTS_POOL.pop(0) if self._NAME_VARIANTS_POOL \
+ else len(self._USED_NAME_VARIANTS)
+ self._USED_NAME_VARIANTS.append(self._name_variant)
+ return self._NAME_VARIANT_PREFIX + str(self._name_variant)
+
+ def release_name_variant(self):
+ """ Return the next available user name variant """
+ self._USED_NAME_VARIANTS.remove(self._name_variant)
+ self._NAME_VARIANTS_POOL.append(self._name_variant)
+ self._name_variant = None
+
+ @gen.coroutine
+ def close_connection(self):
+ """ Close the connection with the client """
+ try:
+ message = DiplomacyMessage()
+ message.content = bytes(responses.TurnOffResponse())
+ yield self.write_message(message)
+ self.stream.close()
+ except StreamClosedError:
+ LOGGER.error('Stream is closed.')
+
+ def on_connection_close(self):
+ """ Invoked when the socket is closed (see parent method).
+ Detach this connection handler from server users.
+ """
+ self.release_name_variant()
+ self.server.users.remove_connection(self, remove_tokens=False)
+ LOGGER.info('Removed connection. Remaining %d connection(s).', self.server.users.count_connections())
+
+ @gen.coroutine
+ def read_stream(self):
+ """ Read the next message from the stream """
+ messages = []
+ in_message = yield DaideMessage.from_stream(self.stream)
+
+ if in_message and in_message.is_valid:
+ message_handler = self.message_mapping.get(in_message.message_type, None)
+ if not message_handler:
+ raise RuntimeError('Unrecognized DAIDE message type [{}]'.format(in_message.message_type))
+
+ if gen.is_coroutine_function(message_handler):
+ messages = yield message_handler(in_message)
+ else:
+ messages = message_handler(in_message)
+ elif in_message:
+ err_message = ErrorMessage()
+ err_message.error_code = in_message.error_code
+ messages = [err_message]
+
+ for message in messages:
+ yield self.write_message(message)
+
+ # Added for compatibility with WebSocketHandler interface
+ def write_message(self, message, binary=True):
+ """ Write a message into the stream """
+ if binary and isinstance(message, bytes):
+ future = self.stream.write(message)
+ else:
+ if isinstance(message, notifications.DaideNotification):
+ LOGGER.info('[%d] notification:[%s]', self._socket_no, bytes_to_str(bytes(message)))
+ notification = message
+ message = DiplomacyMessage()
+ message.content = bytes(notification)
+
+ if isinstance(message, DaideMessage):
+ future = self.stream.write(bytes(message))
+ else:
+ future = Future()
+ future.set_result(None)
+ return future
+
+ def translate_notification(self, notification):
+ """ Translate a notification to a DAIDE notification.
+ :param notification: a notification object to pass to handler function.
+ See diplomacy.communication.notifications for possible notifications.
+ :return: either None or an array of daide notifications.
+ See module diplomacy.daide.notifications for possible daide notifications.
+ """
+ return translate_notification(self.server, notification, self)
+
+ def _on_initial_message(self, _):
+ """ Handle an initial message """
+ LOGGER.info('[%d] initial message', self._socket_no)
+ return [RepresentationMessage()]
+
+ @gen.coroutine
+ def _on_diplomacy_message(self, in_message):
+ """ Handle a diplomacy message """
+ messages = []
+ request = RequestBuilder.from_bytes(in_message.content)
+
+ try:
+ LOGGER.info('[%d] request:[%s]', self._socket_no, bytes_to_str(in_message.content))
+ request.game_id = self.game_id
+ message_responses = yield request_managers.handle_request(self.server, request, self)
+ except exceptions.ResponseException:
+ message_responses = [responses.REJ(bytes(request))]
+
+ if message_responses:
+ for response in message_responses:
+ response_bytes = bytes(response)
+ LOGGER.info('[%d] response:[%s]', self._socket_no, bytes_to_str(response_bytes) \
+ if response_bytes else None)
+ message = DiplomacyMessage()
+ message.content = response_bytes
+ messages.append(message)
+
+ return messages
+
+ def _on_final_message(self, _):
+ """ Handle a final message """
+ LOGGER.info('[%d] final message', self._socket_no)
+ self.stream.close()
+ return []
+
+ def _on_error_message(self, in_message):
+ """ Handle an error message """
+ LOGGER.error('[%d] error [%d]', self._socket_no, in_message.error_code)
+ return []
diff --git a/diplomacy/daide/server.py b/diplomacy/daide/server.py
new file mode 100644
index 0000000..ceca122
--- /dev/null
+++ b/diplomacy/daide/server.py
@@ -0,0 +1,73 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+""" Parallel server to receive DAIDE communications """
+import logging
+from tornado import gen
+from tornado.iostream import StreamClosedError
+from tornado.tcpserver import TCPServer
+from diplomacy.daide.connection_handler import ConnectionHandler
+
+# Constants
+LOGGER = logging.getLogger(__name__)
+
+class Server(TCPServer):
+ """ Represents a server to receive DAIDE communications """
+ def __init__(self, master_server, game_id):
+ """ Contructor
+ :param master_server: the internal server
+ :param game_id: the game id for which this server will receive communications
+ """
+ super(Server, self).__init__()
+ self._master_server = master_server
+ self._game_id = game_id
+ self._registered_connections = {}
+
+ @property
+ def master_server(self):
+ """ Return the master server """
+ return self._master_server
+
+ @property
+ def game_id(self):
+ """ Return the game id associated with the server """
+ return self._game_id
+
+ def stop(self):
+ """ Stop the server and close all connections """
+ for connection_handler in self._registered_connections.values():
+ connection_handler.close_connection()
+ super(Server, self).stop()
+
+ @gen.coroutine
+ def handle_stream(self, stream, address):
+ """ Handle an open stream
+ :param stream: the stream to handle
+ :param address: the address of the client
+ """
+ LOGGER.info('Connection from client [%s]', str(address))
+
+ handler = ConnectionHandler()
+ handler.initialize(stream, self._master_server, self._game_id)
+ self._registered_connections[stream] = handler
+
+ try:
+ while not handler.stream.closed():
+ yield handler.read_stream()
+ except StreamClosedError:
+ LOGGER.error('[%s] disconnected', str(address))
+
+ del self._registered_connections[stream]
diff --git a/diplomacy/server/connection_handler.py b/diplomacy/server/connection_handler.py
index 0089db3..6c1351f 100644
--- a/diplomacy/server/connection_handler.py
+++ b/diplomacy/server/connection_handler.py
@@ -26,6 +26,7 @@ import ujson as json
from diplomacy.communication import responses, requests
from diplomacy.server import request_managers
from diplomacy.utils import exceptions, strings
+from diplomacy.utils.network_data import NetworkData
LOGGER = logging.getLogger(__name__)
@@ -79,6 +80,21 @@ class ConnectionHandler(WebSocketHandler):
self.server.users.remove_connection(self, remove_tokens=False)
LOGGER.info("Removed connection. Remaining %d connection(s).", self.server.users.count_connections())
+ def write_message(self, message, binary=False):
+ """ Sends the given message to the client of this Web Socket. """
+ if isinstance(message, NetworkData):
+ message = message.json()
+ return super(ConnectionHandler, self).write_message(message, binary)
+
+ @staticmethod
+ def translate_notification(notification):
+ """ Translate a notification to an array of notifications.
+ :param notification: a notification object to pass to handler function.
+ See diplomacy.communication.notifications for possible notifications.
+ :return: An array of notifications containing a single notification.
+ """
+ return [notification]
+
@gen.coroutine
def on_message(self, message):
""" Parse given message and manage parsed data (expected a string representation of a request). """
diff --git a/diplomacy/server/notifier.py b/diplomacy/server/notifier.py
index 4bd64d3..2f8761c 100644
--- a/diplomacy/server/notifier.py
+++ b/diplomacy/server/notifier.py
@@ -75,7 +75,10 @@ class Notifier():
"""
connection_handler = self.server.users.get_connection_handler(notification.token)
if not self.ignores(notification) and connection_handler:
- yield self.server.notifications.put((connection_handler, notification))
+ translated_notifications = connection_handler.translate_notification(notification)
+ if translated_notifications:
+ for translated_notification in translated_notifications:
+ yield self.server.notifications.put((connection_handler, translated_notification))
@gen.coroutine
def _notify_game(self, server_game, notification_class, **kwargs):
diff --git a/diplomacy/server/request_managers.py b/diplomacy/server/request_managers.py
index d17a77b..6819421 100644
--- a/diplomacy/server/request_managers.py
+++ b/diplomacy/server/request_managers.py
@@ -149,6 +149,7 @@ def on_create_game(server, request, connection_handler):
# Register game on server.
server.add_new_game(server_game)
+ server.start_new_daide_server(game_id)
# Start game immediately if possible (e.g. if it's a solitaire game).
if server_game.game_can_start():
@@ -216,6 +217,7 @@ def on_delete_game(server, request, connection_handler):
level = verify_request(server, request, connection_handler, observer_role=False, power_role=False)
server.delete_game(level.game)
server.unschedule_game(level.game)
+ server.stop_daide_server(level.game.game_id)
Notifier(server, ignore_tokens=[request.token]).notify_game_deleted(level.game)
def on_get_dummy_waiting_powers(server, request, connection_handler):
@@ -256,6 +258,21 @@ def on_get_available_maps(server, request, connection_handler):
verify_request(server, request, connection_handler)
return responses.DataMaps(data=server.available_maps, request_id=request.request_id)
+def on_get_daide_port(server, request, connection_handler):
+ """ Manage request GetDaidePort.
+ :param server: server which receives the request.
+ :param request: request to manage.
+ :param connection_handler: connection handler from which the request was sent.
+ :return: None
+ :type server: diplomacy.Server
+ :type request: diplomacy.communication.requests.GetDaidePort
+ """
+ del connection_handler
+ daide_port = server.get_daide_port(request.game_id)
+ if daide_port is None:
+ raise exceptions.DaidePortException('Invalid game id or game\'s DAIDE server is not started for that game')
+ return responses.DataPort(data=daide_port, request_id=request.request_id)
+
def on_get_playable_powers(server, request, connection_handler):
""" Manage request GetPlayablePowers.
:param server: server which receives the request.
@@ -1133,6 +1150,7 @@ MAPPING = {
requests.GetDummyWaitingPowers: on_get_dummy_waiting_powers,
requests.GetAllPossibleOrders: on_get_all_possible_orders,
requests.GetAvailableMaps: on_get_available_maps,
+ requests.GetDaidePort: on_get_daide_port,
requests.GetPlayablePowers: on_get_playable_powers,
requests.GetPhaseHistory: on_get_phase_history,
requests.JoinGame: on_join_game,
diff --git a/diplomacy/server/server.py b/diplomacy/server/server.py
index fed4eb6..d1f044d 100644
--- a/diplomacy/server/server.py
+++ b/diplomacy/server/server.py
@@ -50,12 +50,15 @@ import atexit
import base64
import logging
import os
+from random import randint
+import socket
import signal
import tornado
import tornado.web
from tornado import gen
from tornado.ioloop import IOLoop
+from tornado.iostream import StreamClosedError
from tornado.queues import Queue
from tornado.websocket import WebSocketClosedError
@@ -63,6 +66,7 @@ import ujson as json
import diplomacy.settings
from diplomacy.communication import notifications
+from diplomacy.daide.server import Server as DaideServer
from diplomacy.server.connection_handler import ConnectionHandler
from diplomacy.server.notifier import Notifier
from diplomacy.server.scheduler import Scheduler
@@ -73,6 +77,16 @@ from diplomacy.utils import common, exceptions, strings, constants
LOGGER = logging.getLogger(__name__)
+def is_port_opened(port, hostname='127.0.0.1'):
+ """ Checks if the specified port is opened
+ :param port: The port to check
+ :param hostname: The hostname to check, defaults to '127.0.0.1'
+ """
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ if sock.connect_ex((hostname, port)) == 0:
+ return True
+ return False
+
def get_absolute_path(directory=None):
""" Return absolute path of given directory.
If given directory is None, return absolute path of current directory.
@@ -136,6 +150,7 @@ class InterruptionHandler():
:param frame: frame received
"""
if signum == signal.SIGINT:
+ self.server.stop_daide_server(None)
self.server.backup_now(force=True)
if self.previous_handler:
self.previous_handler(signum, frame)
@@ -162,7 +177,7 @@ class Server():
""" Server class. """
__slots__ = ['data_path', 'games_path', 'available_maps', 'maps_mtime', 'notifications',
'games_scheduler', 'allow_registrations', 'max_games', 'remove_canceled_games', 'users', 'games',
- 'backup_server', 'backup_games', 'backup_delay_seconds', 'ping_seconds',
+ 'daide_servers', 'backup_server', 'backup_games', 'backup_delay_seconds', 'ping_seconds',
'interruption_handler', 'backend', 'games_with_dummy_powers', 'dispatched_dummy_powers']
# Servers cache.
@@ -228,6 +243,9 @@ class Server():
# If there is no bot token associated, couple is (None, None).
self.dispatched_dummy_powers = {} # type: dict{str, tuple}
+ # DAIDE TCP servers listening to a game's dedicated port.
+ self.daide_servers = {} # {port: daide_server}
+
# Load data on memory.
self._load()
@@ -405,9 +423,11 @@ class Server():
while True:
connection_handler, notification = yield self.notifications.get()
try:
- yield connection_handler.write_message(notification.json())
+ yield connection_handler.write_message(notification)
except WebSocketClosedError:
LOGGER.error('Websocket was closed while sending a notification.')
+ except StreamClosedError:
+ LOGGER.error('Stream was closed while sending a notification.')
finally:
self.notifications.task_done()
@@ -452,7 +472,8 @@ class Server():
self.backend.port = port
self.set_tasks(io_loop)
LOGGER.info('Running on port %d', self.backend.port)
- io_loop.start()
+ if not io_loop.asyncio_loop.is_running():
+ io_loop.start()
def get_game_indices(self):
""" Iterate over all game indices in server database.
@@ -795,3 +816,44 @@ class Server():
def get_map(self, map_name):
""" Return map power names for given map name. """
return self.available_maps.get(map_name, None)
+
+ def start_new_daide_server(self, game_id, port=None):
+ """ Start a new DAIDE TCP server to handle DAIDE clients connections
+ :param game_id: game id to pass to the DAIDE server
+ :param port: the port to use. If None, an available random prot will be used
+ """
+ if port in self.daide_servers:
+ raise RuntimeError('Port already in used by a DAIDE server')
+
+ for server in self.daide_servers.values():
+ if server.game_id == game_id:
+ return None
+
+ while port is None or is_port_opened(port):
+ port = randint(8000, 8999)
+
+ # Create DAIDE TCP server
+ daide_server = DaideServer(self, game_id)
+ daide_server.listen(port)
+ self.daide_servers[port] = daide_server
+ LOGGER.info('DAIDE server running on port %d', port)
+ return port
+
+ def stop_daide_server(self, game_id):
+ """ Stop one or all DAIDE TCP server
+ :param game_id: game id of the DAIDE server. If None, all servers will be stopped
+ """
+ for port in list(self.daide_servers.keys()):
+ server = self.daide_servers[port]
+ if game_id is None or server.game_id == game_id:
+ server.stop()
+ del self.daide_servers[port]
+
+ def get_daide_port(self, game_id):
+ """ Get the DAIDE port opened for a specific game_id
+ :param game_id: game id of the DAIDE server.
+ """
+ for port, server in self.daide_servers.items():
+ if server.game_id == game_id:
+ return port
+ return None
diff --git a/diplomacy/utils/exceptions.py b/diplomacy/utils/exceptions.py
index 5cf4384..138bed9 100644
--- a/diplomacy/utils/exceptions.py
+++ b/diplomacy/utils/exceptions.py
@@ -86,6 +86,9 @@ class RequestException(ResponseException):
class AdminTokenException(ResponseException):
""" Invalid token for admin operations. """
+class DaidePortException(ResponseException):
+ """ Daide server not started for the game """
+
class GameCanceledException(ResponseException):
""" Game was cancelled. """