aboutsummaryrefslogtreecommitdiff
path: root/diplomacy/client/connection.py
diff options
context:
space:
mode:
authorPhilip Paquette <pcpaquette@gmail.com>2019-09-11 12:58:45 -0400
committerPhilip Paquette <pcpaquette@gmail.com>2019-09-14 18:18:53 -0400
commitabb42dcd4886705d6ba8af27f68ef605218ac67c (patch)
tree9ae16f7a09fff539fa72e65198e284bca6ac3376 /diplomacy/client/connection.py
parenta954a00d263750c279dbb2c0a9ae85707022bcd7 (diff)
Added ReadtheDocs documentation for the public API
- Reformatted the docstring to be compatible - Added tests to make sure the documentation compiles properly - Added sphinx as a pip requirement Co-authored-by: Philip Paquette <pcpaquette@gmail.com> Co-authored-by: notoraptor <stevenbocco@gmail.com>
Diffstat (limited to 'diplomacy/client/connection.py')
-rw-r--r--diplomacy/client/connection.py668
1 files changed, 355 insertions, 313 deletions
diff --git a/diplomacy/client/connection.py b/diplomacy/client/connection.py
index d0d5902..f81d5b6 100644
--- a/diplomacy/client/connection.py
+++ b/diplomacy/client/connection.py
@@ -18,7 +18,7 @@
import logging
import weakref
from datetime import timedelta
-
+from typing import Dict
from tornado import gen, ioloop
from tornado.concurrent import Future
from tornado.iostream import StreamClosedError
@@ -34,245 +34,89 @@ from diplomacy.utils import exceptions, strings, constants
LOGGER = logging.getLogger(__name__)
-class MessageWrittenCallback():
- """ Helper class representing callback to call on a connection when a request is written in a websocket. """
- __slots__ = ['request_context']
-
- def __init__(self, request_context):
- """ Initialize the callback object.
- :param request_context: a request context
- :type request_context: RequestFutureContext
- """
- self.request_context = request_context
-
- def callback(self, msg_future):
- """ Called when request is effectively written on socket, and move the request
- from `request to send` to `request assumed sent`.
- """
- # Remove request context from `requests to send` in any case.
- connection = self.request_context.connection # type: Connection
- request_id = self.request_context.request_id
- exception = msg_future.exception()
- if exception is not None:
- if isinstance(exception, (WebSocketClosedError, StreamClosedError)):
- # Connection suddenly closed.
- # Request context was stored in connection.requests_to_send
- # and will be re-sent when reconnection succeeds.
- # For more details, see method Connection.write_request().
- LOGGER.error('Connection was closed when sending a request. Silently waiting for a reconnection.')
- else:
- LOGGER.error('Fatal error occurred while writing a request.')
- self.request_context.future.set_exception(exception)
- else:
- connection.requests_waiting_responses[request_id] = self.request_context
-
-class Reconnection():
- """ Class performing reconnection work for a given connection.
-
- Class properties:
- =================
-
- - connection: Connection object to reconnect.
-
- - games_phases: dictionary mapping each game address (game ID + game role) to server game info:
- {game ID => {game role => responses.DataGamePhase}}
- Server game info is a DataGamePhase response sent by server as response to a Synchronize request.
- It contains 3 fields: game ID, current server game phase and current server game timestamp.
- We currently use only game phase.
-
- - n_expected_games: number of games registered in games_phases.
-
- - n_synchronized_games: number of games already synchronized.
-
- Reconnection procedure:
- =======================
-
- - Mark all waiting responses as `re-sent` (may be useful on server-side) and
- move them back to responses_to_send.
-
- - Remove all previous synchronization requests that are not yet sent. We will send new synchronization
- requests with latest games timestamps. Future associated to removed requests will raise an exception.
-
- - Initialize games_phases associating None to each game object currently opened in connection.
-
- - Send synchronization request for each game object currently opened in connection. For each game:
-
- - server will send a response describing current server game phase (current phase and timestamp). This info
- will be used to check local requests to send. Note that concrete synchronization is done via notifications.
- Thus, when server responses is received, game synchronization may not be yet terminated, but at least
- we will now current server game phase.
-
- - Server response is saved in games_phases (replacing None associated to game object).
-
- - n_synchronized_games is incremented.
-
- - When sync responses are received for all games registered in games_phases
- (n_expected_games == n_synchronized_games), we can finalize reconnection:
-
- - Remove every phase-dependent game request not yet sent for which phase does not match
- server game phase. Futures associated to removed request will raise an exception.
-
- - Finally send all remaining requests.
-
- These requests may be marked as re-sent.
- For these requests, server is (currently) responsible for checking if they don't represent
- a duplicated query.
+@gen.coroutine
+def connect(hostname, port):
+ """ Connect to given hostname and port.
+ :param hostname: a hostname
+ :param port: a port
+ :return: a Connection object connected.
+ :type hostname: str
+ :type port: int
+ :rtype: Connection
"""
+ connection = Connection(hostname, port)
+ yield connection._connect('Trying to connect.') # pylint: disable=protected-access
+ return connection
- __slots__ = ['connection', 'games_phases', 'n_expected_games', 'n_synchronized_games']
-
- def __init__(self, connection):
- """ Initialize reconnection data/
- :param connection: connection to reconnect.
- :type connection: Connection
- """
- self.connection = connection
- self.games_phases = {}
- self.n_expected_games = 0
- self.n_synchronized_games = 0
-
- def reconnect(self):
- """ Perform concrete reconnection work. """
+class Connection:
+ """ Connection class.
+
+ The connection class should not be initiated directly, but through the connect method
+
+ .. code-block:: python
+
+ >>> from diplomacy.client.connection import connect
+ >>> connection = await connect(hostname, port)
+
+ Properties:
+
+ - **hostname**: :class:`str` hostname to connect (e.g. 'localhost')
+ - **port**: :class:`int` port to connect (e.g. 8888)
+ - **use_ssl**: :class:`bool` telling if connection should be securized (True) or not (False).
+ - **url**: (property) :class:`str` websocket url to connect (generated with hostname and port)
+ - **connection**: :class:`tornado.websocket.WebSocketClientConnection` a tornado websocket connection object
+ - **connection_count**: :class:`int` number of successful connections from this Connection object.
+ Used to check if message callbacks is already launched (if count > 0).
+ - **is_connecting**: :class:`tornado.locks.Event` a tornado Event used to keep connection status.
+ No request can be sent while is_connecting.
+ If connected, Synchronize requests can be sent immediately even if is_reconnecting.
+ Other requests must wait full reconnection.
+ - **is_reconnecting**: :class:`tornado.locks.Event` a tornado Event used to keep re-connection status.
+ Non-synchronize request cannot be sent while is_reconnecting.
+ If reconnected, all requests can be sent.
+ - **channels**: a :class:`weakref.WeakValueDictionary` mapping channel token to :class:`.Channel` object.
+ - **requests_to_send**: a :class:`Dict` mapping a request ID to the context of a request
+ **not sent**. If we are disconnected when trying to send a request, then request
+ context is added to this dictionary to be send later once reconnected.
+ - **requests_waiting_responses**: a :class:`Dict` mapping a request ID to the context of a
+ request **sent**. Contains requests that are waiting for a server response.
+ - **unknown_tokens**: :class:`set` a set of unknown tokens. We can safely ignore them, as the server has been
+ notified.
+ """
+ __slots__ = ['hostname', 'port', 'use_ssl', 'connection', 'is_connecting', 'is_reconnecting', 'connection_count',
+ 'channels', 'requests_to_send', 'requests_waiting_responses', 'unknown_tokens']
- # Mark all waiting responses as `re-sent` and move them back to responses_to_send.
- for waiting_context in self.connection.requests_waiting_responses.values(): # type: RequestFutureContext
- waiting_context.request.re_sent = True
- self.connection.requests_to_send.update(self.connection.requests_waiting_responses)
- self.connection.requests_waiting_responses.clear()
+ def __init__(self, hostname, port, use_ssl=False):
+ """ Constructor
- # Remove all previous synchronization requests.
- requests_to_send_updated = {}
- for context in self.connection.requests_to_send.values(): # type: RequestFutureContext
- if isinstance(context.request, requests.Synchronize):
- context.future.set_exception(exceptions.DiplomacyException(
- 'Sync request invalidated for game ID %s.' % context.request.game_id))
- else:
- requests_to_send_updated[context.request.request_id] = context
- self.connection.requests_to_send = requests_to_send_updated
+ The connection class should not be initiated directly, but through the connect method
- # Count games to synchronize.
- for channel in self.connection.channels.values():
- for game_instance_set in channel.game_id_to_instances.values():
- for game in game_instance_set.get_games():
- self.games_phases.setdefault(game.game_id, {})[game.role] = None
- self.n_expected_games += 1
+ .. code-block:: python
- if self.n_expected_games:
- # Synchronize games.
- for channel in self.connection.channels.values():
- for game_instance_set in channel.game_id_to_instances.values():
- for game in game_instance_set.get_games():
- game.synchronize().add_done_callback(self.generate_sync_callback(game))
- else:
- # No game to sync, finish sync now.
- self.sync_done()
+ >>> from diplomacy.client.connection import connect
+ >>> connection = await connect(hostname, port)
- def generate_sync_callback(self, game):
- """ Generate callback to call when response to sync request is received for given game.
- :param game: game
- :return: a callback.
- :type game: diplomacy.client.network_game.NetworkGame
+ :param hostname: hostname to connect (e.g. 'localhost')
+ :param port: port to connect (e.g. 8888)
+ :param use_ssl: telling if connection should be securized (True) or not (False).
+ :type hostname: str
+ :type port: int
+ :type use_ssl: bool
"""
-
- def on_sync(future):
- """ Callback. If exception occurs, print it as logging error. Else, register server response,
- and move forward to final reconnection work if all games received sync responses.
- """
- exception = future.exception()
- if exception is not None:
- LOGGER.error(str(exception))
- else:
- self.games_phases[game.game_id][game.role] = future.result()
- self.n_synchronized_games += 1
- if self.n_synchronized_games == self.n_expected_games:
- self.sync_done()
-
- return on_sync
-
- def sync_done(self):
- """ Final reconnection work. Remove obsolete game requests and send remaining requests. """
-
- # All sync requests sent have finished.
- # Remove all obsolete game requests from connection.
- # A game request is obsolete if it's phase-dependent and if its phase does not match current game phase.
-
- request_to_send_updated = {}
- for context in self.connection.requests_to_send.values(): # type: RequestFutureContext
- keep = True
- if context.request.level == strings.GAME and context.request.phase_dependent:
- request_phase = context.request.phase
- server_phase = self.games_phases[context.request.game_id][context.request.game_role].phase
- if request_phase != server_phase:
- # Request is obsolete.
- context.future.set_exception(exceptions.DiplomacyException(
- 'Game %s: request %s: request phase %s does not match current server game phase %s.'
- % (context.request.game_id, context.request.name, request_phase, server_phase)))
- keep = False
- if keep:
- request_to_send_updated[context.request.request_id] = context
-
- LOGGER.debug('Keep %d/%d old requests to send.',
- len(request_to_send_updated), len(self.connection.requests_to_send))
-
- # All requests to send are stored in request_to_send_updated.
- # Then we can empty connection.requests_to_send.
- # If we fail to send a request, it will be re-added again.
- self.connection.requests_to_send.clear()
-
- # Send requests.
- for request_to_send in request_to_send_updated.values(): # type: RequestFutureContext
- self.connection.write_request(request_to_send).add_done_callback(
- MessageWrittenCallback(request_to_send).callback)
-
- # We are reconnected.
- self.connection.is_reconnecting.set()
-
- LOGGER.info('Done reconnection work.')
-
-class Connection():
- """ Connection class. Properties:
- - hostname: hostname to connect (e.g. 'localhost')
- - port: port to connect (e.g. 8888)
- - use_ssl: boolean telling if connection should be securized (True) or not (False).
- - url (auto): websocket url to connect (generated with hostname and port)
- - connection: a tornado websocket connection object
- - connection_count: number of successful connections from this Connection object.
- Used to check if message callbacks is already launched (if count > 0).
- - connection_lock: a tornado lock used to access tornado websocket connection object
- - is_connecting: a tornado Event used to keep connection status.
- No request can be sent while is_connecting.
- If connected, Synchronize requests can be sent immediately even if is_reconnecting.
- Other requests must wait full reconnection.
- - is_reconnecting: a tornado Event used to keep re-connection status.
- Non-synchronize request cannot be sent while is_reconnecting.
- If reconnected, all requests can be sent.
- - channels: a WeakValueDictionary mapping channel token to Channel object.
- - requests_to_send: a dictionary mapping a request ID to the context of a request
- **not sent**. If we are disconnected when trying to send a request, then request
- context is added to this dictionary to be send later once reconnected.
- - requests_waiting_responses: a dictionary mapping a request ID to the context of a
- request **sent**. Contains requests that are waiting for a server response.
- - unknown_tokens: a set of unknown tokens. We can safely ignore them, as the server has been notified.
- """
- __slots__ = ['hostname', 'port', 'use_ssl', 'connection', 'is_connecting', 'is_reconnecting', 'connection_count',
- 'channels', 'requests_to_send', 'requests_waiting_responses', 'unknown_tokens']
-
- def __init__(self, hostname, port, use_ssl=False):
self.hostname = hostname
self.port = port
self.use_ssl = bool(use_ssl)
self.connection = None
+ self.connection_count = 0
self.is_connecting = Event()
self.is_reconnecting = Event()
- self.connection_count = 0
-
self.channels = weakref.WeakValueDictionary() # {token => Channel}
- self.requests_to_send = {} # type: dict{str, RequestFutureContext}
- self.requests_waiting_responses = {} # type: dict{str, RequestFutureContext}
+ self.requests_to_send = {} # type: Dict[str, RequestFutureContext]
+ self.requests_waiting_responses = {} # type: Dict[str, RequestFutureContext]
self.unknown_tokens = set()
# When connection is created, we are not yet connected, but reconnection does not matter
@@ -281,12 +125,53 @@ class Connection():
url = property(lambda self: '%s://%s:%d' % ('wss' if self.use_ssl else 'ws', self.hostname, self.port))
+ # ===================
+ # Public Methods.
+ # ===================
+
+ @gen.coroutine
+ def authenticate(self, username, password, create_user=False):
+ """ Send a :class:`.SignIn` request.
+
+ :param username: username
+ :param password: password
+ :param create_user: boolean indicating if you want to create a user or login to an existing user.
+ :return: a :class:`.Channel` object representing the authentication.
+ :type username: str
+ :type password: str
+ :type create_user: bool
+ :rtype: diplomacy.client.channel.Channel
+ """
+ request = requests.SignIn(username=username, password=password, create_user=create_user)
+ return (yield self.send(request))
+
+ @gen.coroutine
+ def get_daide_port(self, game_id):
+ """ Send a :class:`.GetDaidePort` request.
+
+ :param game_id: game id for which to retrieve the DAIDE port.
+ :return: the game DAIDE port
+ :type game_id: str
+ :rtype: int
+ """
+ request = requests.GetDaidePort(game_id=game_id)
+ return (yield self.send(request))
+
+ # ===================
+ # Private Methods
+ # ===================
+
@gen.coroutine
- def _connect(self):
+ def _connect(self, message=None):
""" Create (force) a tornado websocket connection. Try NB_CONNECTION_ATTEMPTS attempts,
waiting for ATTEMPT_DELAY_SECONDS seconds between 2 attempts.
Raise an exception if it cannot connect.
+
+ :param message: if provided, print this message as a logger info before starting to connect.
+ :type message: str, optional
"""
+ if message:
+ LOGGER.info(message)
# We are connecting.
self.is_connecting.clear()
@@ -319,71 +204,11 @@ class Connection():
@gen.coroutine
def _reconnect(self):
""" Reconnect. """
- LOGGER.info('Trying to reconnect.')
# We are reconnecting.
self.is_reconnecting.clear()
- yield self._connect()
+ yield self._connect('Trying to reconnect.')
# We will be reconnected when method Reconnection.sync_done() will finish.
- Reconnection(self).reconnect()
-
- def _register_to_send(self, request_context):
- """ Register given request context as a request to send as soon as possible.
- :param request_context: context of request to send.
- :type request_context: RequestFutureContext
- """
- self.requests_to_send[request_context.request_id] = request_context
-
- def write_request(self, request_context):
- """ Write a request into internal connection object.
- :param request_context: context of request to send.
- :type request_context: RequestFutureContext
- """
- future = Future()
- request = request_context.request
-
- def on_message_written(write_future):
- """ 3) Writing returned, set future as done (with writing result) or with writing exception. """
- exception = write_future.exception()
- if exception is not None:
- future.set_exception(exception)
- else:
- future.set_result(write_future.result())
-
- def on_connected(reconnected_future):
- """ 2) Send request. """
- exception = reconnected_future.exception()
- if exception is not None:
- LOGGER.error('Fatal (re)connection error occurred while sending a request.')
- future.set_exception(exception)
- else:
- try:
- if self.connection is None:
- raise WebSocketClosedError()
- write_future = self.connection.write_message(request.json())
- except (WebSocketClosedError, StreamClosedError) as exc:
- # We were disconnected.
- # Save request context as a request to send.
- # We will re-try to send it later once reconnected.
- self._register_to_send(request_context)
- # Transfer exception to returned future.
- future.set_exception(exc)
- else:
- write_future.add_done_callback(on_message_written)
-
- # 1) Synchronize requests just wait for connection.
- # Other requests wait for reconnection (which also implies connection).
- if isinstance(request, requests.Synchronize):
- self.is_connecting.wait().add_done_callback(on_connected)
- else:
- self.is_reconnecting.wait().add_done_callback(on_connected)
-
- return future
-
- @gen.coroutine
- def connect(self):
- """ Effectively connect this object. """
- LOGGER.info('Trying to connect.')
- yield self._connect()
+ _Reconnection(self).reconnect()
@gen.coroutine
def _on_socket_message(self, socket_message):
@@ -461,47 +286,264 @@ class Connection():
except (WebSocketClosedError, StreamClosedError):
pass
- # Public methods.
- @gen.coroutine
- def get_daide_port(self, game_id):
- """ Send a GetDaidePort request.
- :param game_id: game id
- :return: int. the game DAIDE port
- """
- request = requests.GetDaidePort(game_id=game_id)
- return (yield self.send(request))
+ def _register_to_send(self, request_context):
+ """ Register given request context as a request to send as soon as possible.
- @gen.coroutine
- def authenticate(self, username, password, create_user=False):
- """ Send a SignIn request.
- :param username: username
- :param password: password
- :param create_user: boolean indicating if you want to create a user or login to and existing user.
- :return: a Channel object representing the authentication.
+ :param request_context: context of request to send.
+ :type request_context: RequestFutureContext
"""
- request = requests.SignIn(username=username, password=password, create_user=create_user)
- return (yield self.send(request))
+ self.requests_to_send[request_context.request_id] = request_context
def send(self, request, for_game=None):
""" Send a request.
+
:param request: request object.
:param for_game: (optional) NetworkGame object (required for game requests).
:return: a Future that returns the response handler result of this request.
+ :type request: diplomacy.communication.requests._AbstractRequest
+ :type for_game: diplomacy.client.network_game.NetworkGame, optional
+ :rtype: Future
"""
request_future = Future()
request_context = RequestFutureContext(request=request, future=request_future, connection=self, game=for_game)
- self.write_request(request_context).add_done_callback(MessageWrittenCallback(request_context).callback)
+ self.write_request(request_context).add_done_callback(_MessageWrittenCallback(request_context).callback)
return gen.with_timeout(timedelta(seconds=constants.REQUEST_TIMEOUT_SECONDS), request_future)
-@gen.coroutine
-def connect(hostname, port):
- """ Connect to given hostname and port.
- :param hostname: a hostname
- :param port: a port
- :return: a Connection object connected.
- :rtype: Connection
+ def write_request(self, request_context):
+ """ Write a request into internal connection object.
+
+ :param request_context: context of request to send.
+ :type request_context: RequestFutureContext
+ """
+ future = Future()
+ request = request_context.request
+
+ def on_message_written(write_future):
+ """ 3) Writing returned, set future as done (with writing result)
+ or with writing exception.
+ """
+ exception = write_future.exception()
+ if exception is not None:
+ future.set_exception(exception)
+ else:
+ future.set_result(write_future.result())
+
+ def on_connected(reconnected_future):
+ """ 2) Send request. """
+ exception = reconnected_future.exception()
+ if exception is not None:
+ LOGGER.error('Fatal (re)connection error occurred while sending a request.')
+ future.set_exception(exception)
+ else:
+ try:
+ if self.connection is None:
+ raise WebSocketClosedError()
+ write_future = self.connection.write_message(request.json())
+ except (WebSocketClosedError, StreamClosedError) as exc:
+ # We were disconnected.
+ # Save request context as a request to send.
+ # We will re-try to send it later once reconnected.
+ self._register_to_send(request_context)
+ # Transfer exception to returned future.
+ future.set_exception(exc)
+ else:
+ write_future.add_done_callback(on_message_written)
+
+ # 1) Synchronize requests just wait for connection.
+ # Other requests wait for reconnection (which also implies connection).
+ if isinstance(request, requests.Synchronize):
+ self.is_connecting.wait().add_done_callback(on_connected)
+ else:
+ self.is_reconnecting.wait().add_done_callback(on_connected)
+
+ return future
+
+class _Reconnection:
+ """ Class performing reconnection work for a given connection.
+
+ Class properties:
+
+ - connection: Connection object to reconnect.
+ - games_phases: dictionary mapping each game address (game ID + game role) to server game info:
+ {game ID => {game role => responses.DataGamePhase}}.
+ Server game info is a DataGamePhase response sent by server as response to a Synchronize request.
+ It contains 3 fields: game ID, current server game phase and current server game timestamp.
+ We currently use only game phase.
+ - n_expected_games: number of games registered in games_phases.
+ - n_synchronized_games: number of games already synchronized.
+
+ Reconnection procedure:
+
+ - Mark all waiting responses as `re-sent` (may be useful on server-side) and
+ move them back to responses_to_send.
+ - Remove all previous synchronization requests that are not yet sent. We will send new synchronization
+ requests with latest games timestamps. Future associated to removed requests will raise an exception.
+ - Initialize games_phases associating None to each game object currently opened in connection.
+ - Send synchronization request for each game object currently opened in connection. For each game:
+
+ - server will send a response describing current server game phase (current phase and timestamp). This info
+ will be used to check local requests to send. Note that concrete synchronization is done via notifications.
+ Thus, when server responses is received, game synchronization may not be yet terminated, but at least
+ we will now current server game phase.
+ - Server response is saved in games_phases (replacing None associated to game object).
+ - n_synchronized_games is incremented.
+
+ - When sync responses are received for all games registered in games_phases
+ (n_expected_games == n_synchronized_games), we can finalize reconnection:
+
+ - Remove every phase-dependent game request not yet sent for which phase does not match
+ server game phase. Futures associated to removed request will raise an exception.
+ - Finally send all remaining requests.
+
+ These requests may be marked as re-sent.
+ For these requests, server is (currently) responsible for checking if they don't represent
+ a duplicated query.
"""
- connection = Connection(hostname, port)
- yield connection.connect()
- return connection
+
+ __slots__ = ['connection', 'games_phases', 'n_expected_games', 'n_synchronized_games']
+
+ def __init__(self, connection):
+ """ Initialize reconnection data/
+
+ :param connection: connection to reconnect.
+ :type connection: Connection
+ """
+ self.connection = connection
+ self.games_phases = {}
+ self.n_expected_games = 0
+ self.n_synchronized_games = 0
+
+ def reconnect(self):
+ """ Perform concrete reconnection work. """
+
+ # Mark all waiting responses as `re-sent` and move them back to responses_to_send.
+ for waiting_context in self.connection.requests_waiting_responses.values(): # type: RequestFutureContext
+ waiting_context.request.re_sent = True
+ self.connection.requests_to_send.update(self.connection.requests_waiting_responses)
+ self.connection.requests_waiting_responses.clear()
+
+ # Remove all previous synchronization requests.
+ requests_to_send_updated = {}
+ for context in self.connection.requests_to_send.values(): # type: RequestFutureContext
+ if isinstance(context.request, requests.Synchronize):
+ context.future.set_exception(exceptions.DiplomacyException(
+ 'Sync request invalidated for game ID %s.' % context.request.game_id))
+ else:
+ requests_to_send_updated[context.request.request_id] = context
+ self.connection.requests_to_send = requests_to_send_updated
+
+ # Count games to synchronize.
+ for channel in self.connection.channels.values():
+ for game_instance_set in channel.game_id_to_instances.values():
+ for game in game_instance_set.get_games():
+ self.games_phases.setdefault(game.game_id, {})[game.role] = None
+ self.n_expected_games += 1
+
+ if self.n_expected_games:
+ # Synchronize games.
+ for channel in self.connection.channels.values():
+ for game_instance_set in channel.game_id_to_instances.values():
+ for game in game_instance_set.get_games():
+ game.synchronize().add_done_callback(self.generate_sync_callback(game))
+ else:
+ # No game to sync, finish sync now.
+ self.sync_done()
+
+ def generate_sync_callback(self, game):
+ """ Generate callback to call when response to sync request is received for given game.
+
+ :param game: game
+ :return: a callback.
+ :type game: diplomacy.client.network_game.NetworkGame
+ """
+
+ def on_sync(future):
+ """ Callback. If exception occurs, print it as logging error. Else, register server response,
+ and move forward to final reconnection work if all games received sync responses.
+ """
+ exception = future.exception()
+ if exception is not None:
+ LOGGER.error(str(exception))
+ else:
+ self.games_phases[game.game_id][game.role] = future.result()
+ self.n_synchronized_games += 1
+ if self.n_synchronized_games == self.n_expected_games:
+ self.sync_done()
+
+ return on_sync
+
+ def sync_done(self):
+ """ Final reconnection work. Remove obsolete game requests and send remaining requests. """
+
+ # All sync requests sent have finished.
+ # Remove all obsolete game requests from connection.
+ # A game request is obsolete if it's phase-dependent and if its phase does not match current game phase.
+
+ request_to_send_updated = {}
+ for context in self.connection.requests_to_send.values(): # type: RequestFutureContext
+ keep = True
+ if context.request.level == strings.GAME and context.request.phase_dependent:
+ request_phase = context.request.phase
+ server_phase = self.games_phases[context.request.game_id][context.request.game_role].phase
+ if request_phase != server_phase:
+ # Request is obsolete.
+ context.future.set_exception(exceptions.DiplomacyException(
+ 'Game %s: request %s: request phase %s does not match current server game phase %s.'
+ % (context.request.game_id, context.request.name, request_phase, server_phase)))
+ keep = False
+ if keep:
+ request_to_send_updated[context.request.request_id] = context
+
+ LOGGER.debug('Keep %d/%d old requests to send.',
+ len(request_to_send_updated), len(self.connection.requests_to_send))
+
+ # All requests to send are stored in request_to_send_updated.
+ # Then we can empty connection.requests_to_send.
+ # If we fail to send a request, it will be re-added again.
+ self.connection.requests_to_send.clear()
+
+ # Send requests.
+ for request_to_send in request_to_send_updated.values(): # type: RequestFutureContext
+ self.connection.write_request(request_to_send).add_done_callback(
+ _MessageWrittenCallback(request_to_send).callback)
+
+ # We are reconnected.
+ self.connection.is_reconnecting.set()
+
+ LOGGER.info('Done reconnection work.')
+
+class _MessageWrittenCallback:
+ """ Helper class representing callback to call on a connection
+ when a request is written in a websocket.
+ """
+ __slots__ = ['request_context']
+
+ def __init__(self, request_context):
+ """ Initialize the callback object.
+
+ :param request_context: a request context
+ :type request_context: RequestFutureContext
+ """
+ self.request_context = request_context
+
+ def callback(self, msg_future):
+ """ Called when request is effectively written on socket, and move the request
+ from `request to send` to `request assumed sent`.
+ """
+ # Remove request context from `requests to send` in any case.
+ connection = self.request_context.connection # type: Connection
+ request_id = self.request_context.request_id
+ exception = msg_future.exception()
+ if exception is not None:
+ if isinstance(exception, (WebSocketClosedError, StreamClosedError)):
+ # Connection suddenly closed.
+ # Request context was stored in connection.requests_to_send
+ # and will be re-sent when reconnection succeeds.
+ # For more details, see method Connection.write_request().
+ LOGGER.error('Connection was closed when sending a request. Silently waiting for a reconnection.')
+ else:
+ LOGGER.error('Fatal error occurred while writing a request.')
+ self.request_context.future.set_exception(exception)
+ else:
+ connection.requests_waiting_responses[request_id] = self.request_context