diff options
Diffstat (limited to 'diplomacy/client/connection.py')
-rw-r--r-- | diplomacy/client/connection.py | 668 |
1 files changed, 355 insertions, 313 deletions
diff --git a/diplomacy/client/connection.py b/diplomacy/client/connection.py index d0d5902..f81d5b6 100644 --- a/diplomacy/client/connection.py +++ b/diplomacy/client/connection.py @@ -18,7 +18,7 @@ import logging import weakref from datetime import timedelta - +from typing import Dict from tornado import gen, ioloop from tornado.concurrent import Future from tornado.iostream import StreamClosedError @@ -34,245 +34,89 @@ from diplomacy.utils import exceptions, strings, constants LOGGER = logging.getLogger(__name__) -class MessageWrittenCallback(): - """ Helper class representing callback to call on a connection when a request is written in a websocket. """ - __slots__ = ['request_context'] - - def __init__(self, request_context): - """ Initialize the callback object. - :param request_context: a request context - :type request_context: RequestFutureContext - """ - self.request_context = request_context - - def callback(self, msg_future): - """ Called when request is effectively written on socket, and move the request - from `request to send` to `request assumed sent`. - """ - # Remove request context from `requests to send` in any case. - connection = self.request_context.connection # type: Connection - request_id = self.request_context.request_id - exception = msg_future.exception() - if exception is not None: - if isinstance(exception, (WebSocketClosedError, StreamClosedError)): - # Connection suddenly closed. - # Request context was stored in connection.requests_to_send - # and will be re-sent when reconnection succeeds. - # For more details, see method Connection.write_request(). - LOGGER.error('Connection was closed when sending a request. Silently waiting for a reconnection.') - else: - LOGGER.error('Fatal error occurred while writing a request.') - self.request_context.future.set_exception(exception) - else: - connection.requests_waiting_responses[request_id] = self.request_context - -class Reconnection(): - """ Class performing reconnection work for a given connection. - - Class properties: - ================= - - - connection: Connection object to reconnect. - - - games_phases: dictionary mapping each game address (game ID + game role) to server game info: - {game ID => {game role => responses.DataGamePhase}} - Server game info is a DataGamePhase response sent by server as response to a Synchronize request. - It contains 3 fields: game ID, current server game phase and current server game timestamp. - We currently use only game phase. - - - n_expected_games: number of games registered in games_phases. - - - n_synchronized_games: number of games already synchronized. - - Reconnection procedure: - ======================= - - - Mark all waiting responses as `re-sent` (may be useful on server-side) and - move them back to responses_to_send. - - - Remove all previous synchronization requests that are not yet sent. We will send new synchronization - requests with latest games timestamps. Future associated to removed requests will raise an exception. - - - Initialize games_phases associating None to each game object currently opened in connection. - - - Send synchronization request for each game object currently opened in connection. For each game: - - - server will send a response describing current server game phase (current phase and timestamp). This info - will be used to check local requests to send. Note that concrete synchronization is done via notifications. - Thus, when server responses is received, game synchronization may not be yet terminated, but at least - we will now current server game phase. - - - Server response is saved in games_phases (replacing None associated to game object). - - - n_synchronized_games is incremented. - - - When sync responses are received for all games registered in games_phases - (n_expected_games == n_synchronized_games), we can finalize reconnection: - - - Remove every phase-dependent game request not yet sent for which phase does not match - server game phase. Futures associated to removed request will raise an exception. - - - Finally send all remaining requests. - - These requests may be marked as re-sent. - For these requests, server is (currently) responsible for checking if they don't represent - a duplicated query. +@gen.coroutine +def connect(hostname, port): + """ Connect to given hostname and port. + :param hostname: a hostname + :param port: a port + :return: a Connection object connected. + :type hostname: str + :type port: int + :rtype: Connection """ + connection = Connection(hostname, port) + yield connection._connect('Trying to connect.') # pylint: disable=protected-access + return connection - __slots__ = ['connection', 'games_phases', 'n_expected_games', 'n_synchronized_games'] - - def __init__(self, connection): - """ Initialize reconnection data/ - :param connection: connection to reconnect. - :type connection: Connection - """ - self.connection = connection - self.games_phases = {} - self.n_expected_games = 0 - self.n_synchronized_games = 0 - - def reconnect(self): - """ Perform concrete reconnection work. """ +class Connection: + """ Connection class. + + The connection class should not be initiated directly, but through the connect method + + .. code-block:: python + + >>> from diplomacy.client.connection import connect + >>> connection = await connect(hostname, port) + + Properties: + + - **hostname**: :class:`str` hostname to connect (e.g. 'localhost') + - **port**: :class:`int` port to connect (e.g. 8888) + - **use_ssl**: :class:`bool` telling if connection should be securized (True) or not (False). + - **url**: (property) :class:`str` websocket url to connect (generated with hostname and port) + - **connection**: :class:`tornado.websocket.WebSocketClientConnection` a tornado websocket connection object + - **connection_count**: :class:`int` number of successful connections from this Connection object. + Used to check if message callbacks is already launched (if count > 0). + - **is_connecting**: :class:`tornado.locks.Event` a tornado Event used to keep connection status. + No request can be sent while is_connecting. + If connected, Synchronize requests can be sent immediately even if is_reconnecting. + Other requests must wait full reconnection. + - **is_reconnecting**: :class:`tornado.locks.Event` a tornado Event used to keep re-connection status. + Non-synchronize request cannot be sent while is_reconnecting. + If reconnected, all requests can be sent. + - **channels**: a :class:`weakref.WeakValueDictionary` mapping channel token to :class:`.Channel` object. + - **requests_to_send**: a :class:`Dict` mapping a request ID to the context of a request + **not sent**. If we are disconnected when trying to send a request, then request + context is added to this dictionary to be send later once reconnected. + - **requests_waiting_responses**: a :class:`Dict` mapping a request ID to the context of a + request **sent**. Contains requests that are waiting for a server response. + - **unknown_tokens**: :class:`set` a set of unknown tokens. We can safely ignore them, as the server has been + notified. + """ + __slots__ = ['hostname', 'port', 'use_ssl', 'connection', 'is_connecting', 'is_reconnecting', 'connection_count', + 'channels', 'requests_to_send', 'requests_waiting_responses', 'unknown_tokens'] - # Mark all waiting responses as `re-sent` and move them back to responses_to_send. - for waiting_context in self.connection.requests_waiting_responses.values(): # type: RequestFutureContext - waiting_context.request.re_sent = True - self.connection.requests_to_send.update(self.connection.requests_waiting_responses) - self.connection.requests_waiting_responses.clear() + def __init__(self, hostname, port, use_ssl=False): + """ Constructor - # Remove all previous synchronization requests. - requests_to_send_updated = {} - for context in self.connection.requests_to_send.values(): # type: RequestFutureContext - if isinstance(context.request, requests.Synchronize): - context.future.set_exception(exceptions.DiplomacyException( - 'Sync request invalidated for game ID %s.' % context.request.game_id)) - else: - requests_to_send_updated[context.request.request_id] = context - self.connection.requests_to_send = requests_to_send_updated + The connection class should not be initiated directly, but through the connect method - # Count games to synchronize. - for channel in self.connection.channels.values(): - for game_instance_set in channel.game_id_to_instances.values(): - for game in game_instance_set.get_games(): - self.games_phases.setdefault(game.game_id, {})[game.role] = None - self.n_expected_games += 1 + .. code-block:: python - if self.n_expected_games: - # Synchronize games. - for channel in self.connection.channels.values(): - for game_instance_set in channel.game_id_to_instances.values(): - for game in game_instance_set.get_games(): - game.synchronize().add_done_callback(self.generate_sync_callback(game)) - else: - # No game to sync, finish sync now. - self.sync_done() + >>> from diplomacy.client.connection import connect + >>> connection = await connect(hostname, port) - def generate_sync_callback(self, game): - """ Generate callback to call when response to sync request is received for given game. - :param game: game - :return: a callback. - :type game: diplomacy.client.network_game.NetworkGame + :param hostname: hostname to connect (e.g. 'localhost') + :param port: port to connect (e.g. 8888) + :param use_ssl: telling if connection should be securized (True) or not (False). + :type hostname: str + :type port: int + :type use_ssl: bool """ - - def on_sync(future): - """ Callback. If exception occurs, print it as logging error. Else, register server response, - and move forward to final reconnection work if all games received sync responses. - """ - exception = future.exception() - if exception is not None: - LOGGER.error(str(exception)) - else: - self.games_phases[game.game_id][game.role] = future.result() - self.n_synchronized_games += 1 - if self.n_synchronized_games == self.n_expected_games: - self.sync_done() - - return on_sync - - def sync_done(self): - """ Final reconnection work. Remove obsolete game requests and send remaining requests. """ - - # All sync requests sent have finished. - # Remove all obsolete game requests from connection. - # A game request is obsolete if it's phase-dependent and if its phase does not match current game phase. - - request_to_send_updated = {} - for context in self.connection.requests_to_send.values(): # type: RequestFutureContext - keep = True - if context.request.level == strings.GAME and context.request.phase_dependent: - request_phase = context.request.phase - server_phase = self.games_phases[context.request.game_id][context.request.game_role].phase - if request_phase != server_phase: - # Request is obsolete. - context.future.set_exception(exceptions.DiplomacyException( - 'Game %s: request %s: request phase %s does not match current server game phase %s.' - % (context.request.game_id, context.request.name, request_phase, server_phase))) - keep = False - if keep: - request_to_send_updated[context.request.request_id] = context - - LOGGER.debug('Keep %d/%d old requests to send.', - len(request_to_send_updated), len(self.connection.requests_to_send)) - - # All requests to send are stored in request_to_send_updated. - # Then we can empty connection.requests_to_send. - # If we fail to send a request, it will be re-added again. - self.connection.requests_to_send.clear() - - # Send requests. - for request_to_send in request_to_send_updated.values(): # type: RequestFutureContext - self.connection.write_request(request_to_send).add_done_callback( - MessageWrittenCallback(request_to_send).callback) - - # We are reconnected. - self.connection.is_reconnecting.set() - - LOGGER.info('Done reconnection work.') - -class Connection(): - """ Connection class. Properties: - - hostname: hostname to connect (e.g. 'localhost') - - port: port to connect (e.g. 8888) - - use_ssl: boolean telling if connection should be securized (True) or not (False). - - url (auto): websocket url to connect (generated with hostname and port) - - connection: a tornado websocket connection object - - connection_count: number of successful connections from this Connection object. - Used to check if message callbacks is already launched (if count > 0). - - connection_lock: a tornado lock used to access tornado websocket connection object - - is_connecting: a tornado Event used to keep connection status. - No request can be sent while is_connecting. - If connected, Synchronize requests can be sent immediately even if is_reconnecting. - Other requests must wait full reconnection. - - is_reconnecting: a tornado Event used to keep re-connection status. - Non-synchronize request cannot be sent while is_reconnecting. - If reconnected, all requests can be sent. - - channels: a WeakValueDictionary mapping channel token to Channel object. - - requests_to_send: a dictionary mapping a request ID to the context of a request - **not sent**. If we are disconnected when trying to send a request, then request - context is added to this dictionary to be send later once reconnected. - - requests_waiting_responses: a dictionary mapping a request ID to the context of a - request **sent**. Contains requests that are waiting for a server response. - - unknown_tokens: a set of unknown tokens. We can safely ignore them, as the server has been notified. - """ - __slots__ = ['hostname', 'port', 'use_ssl', 'connection', 'is_connecting', 'is_reconnecting', 'connection_count', - 'channels', 'requests_to_send', 'requests_waiting_responses', 'unknown_tokens'] - - def __init__(self, hostname, port, use_ssl=False): self.hostname = hostname self.port = port self.use_ssl = bool(use_ssl) self.connection = None + self.connection_count = 0 self.is_connecting = Event() self.is_reconnecting = Event() - self.connection_count = 0 - self.channels = weakref.WeakValueDictionary() # {token => Channel} - self.requests_to_send = {} # type: dict{str, RequestFutureContext} - self.requests_waiting_responses = {} # type: dict{str, RequestFutureContext} + self.requests_to_send = {} # type: Dict[str, RequestFutureContext] + self.requests_waiting_responses = {} # type: Dict[str, RequestFutureContext] self.unknown_tokens = set() # When connection is created, we are not yet connected, but reconnection does not matter @@ -281,12 +125,53 @@ class Connection(): url = property(lambda self: '%s://%s:%d' % ('wss' if self.use_ssl else 'ws', self.hostname, self.port)) + # =================== + # Public Methods. + # =================== + + @gen.coroutine + def authenticate(self, username, password, create_user=False): + """ Send a :class:`.SignIn` request. + + :param username: username + :param password: password + :param create_user: boolean indicating if you want to create a user or login to an existing user. + :return: a :class:`.Channel` object representing the authentication. + :type username: str + :type password: str + :type create_user: bool + :rtype: diplomacy.client.channel.Channel + """ + request = requests.SignIn(username=username, password=password, create_user=create_user) + return (yield self.send(request)) + + @gen.coroutine + def get_daide_port(self, game_id): + """ Send a :class:`.GetDaidePort` request. + + :param game_id: game id for which to retrieve the DAIDE port. + :return: the game DAIDE port + :type game_id: str + :rtype: int + """ + request = requests.GetDaidePort(game_id=game_id) + return (yield self.send(request)) + + # =================== + # Private Methods + # =================== + @gen.coroutine - def _connect(self): + def _connect(self, message=None): """ Create (force) a tornado websocket connection. Try NB_CONNECTION_ATTEMPTS attempts, waiting for ATTEMPT_DELAY_SECONDS seconds between 2 attempts. Raise an exception if it cannot connect. + + :param message: if provided, print this message as a logger info before starting to connect. + :type message: str, optional """ + if message: + LOGGER.info(message) # We are connecting. self.is_connecting.clear() @@ -319,71 +204,11 @@ class Connection(): @gen.coroutine def _reconnect(self): """ Reconnect. """ - LOGGER.info('Trying to reconnect.') # We are reconnecting. self.is_reconnecting.clear() - yield self._connect() + yield self._connect('Trying to reconnect.') # We will be reconnected when method Reconnection.sync_done() will finish. - Reconnection(self).reconnect() - - def _register_to_send(self, request_context): - """ Register given request context as a request to send as soon as possible. - :param request_context: context of request to send. - :type request_context: RequestFutureContext - """ - self.requests_to_send[request_context.request_id] = request_context - - def write_request(self, request_context): - """ Write a request into internal connection object. - :param request_context: context of request to send. - :type request_context: RequestFutureContext - """ - future = Future() - request = request_context.request - - def on_message_written(write_future): - """ 3) Writing returned, set future as done (with writing result) or with writing exception. """ - exception = write_future.exception() - if exception is not None: - future.set_exception(exception) - else: - future.set_result(write_future.result()) - - def on_connected(reconnected_future): - """ 2) Send request. """ - exception = reconnected_future.exception() - if exception is not None: - LOGGER.error('Fatal (re)connection error occurred while sending a request.') - future.set_exception(exception) - else: - try: - if self.connection is None: - raise WebSocketClosedError() - write_future = self.connection.write_message(request.json()) - except (WebSocketClosedError, StreamClosedError) as exc: - # We were disconnected. - # Save request context as a request to send. - # We will re-try to send it later once reconnected. - self._register_to_send(request_context) - # Transfer exception to returned future. - future.set_exception(exc) - else: - write_future.add_done_callback(on_message_written) - - # 1) Synchronize requests just wait for connection. - # Other requests wait for reconnection (which also implies connection). - if isinstance(request, requests.Synchronize): - self.is_connecting.wait().add_done_callback(on_connected) - else: - self.is_reconnecting.wait().add_done_callback(on_connected) - - return future - - @gen.coroutine - def connect(self): - """ Effectively connect this object. """ - LOGGER.info('Trying to connect.') - yield self._connect() + _Reconnection(self).reconnect() @gen.coroutine def _on_socket_message(self, socket_message): @@ -461,47 +286,264 @@ class Connection(): except (WebSocketClosedError, StreamClosedError): pass - # Public methods. - @gen.coroutine - def get_daide_port(self, game_id): - """ Send a GetDaidePort request. - :param game_id: game id - :return: int. the game DAIDE port - """ - request = requests.GetDaidePort(game_id=game_id) - return (yield self.send(request)) + def _register_to_send(self, request_context): + """ Register given request context as a request to send as soon as possible. - @gen.coroutine - def authenticate(self, username, password, create_user=False): - """ Send a SignIn request. - :param username: username - :param password: password - :param create_user: boolean indicating if you want to create a user or login to and existing user. - :return: a Channel object representing the authentication. + :param request_context: context of request to send. + :type request_context: RequestFutureContext """ - request = requests.SignIn(username=username, password=password, create_user=create_user) - return (yield self.send(request)) + self.requests_to_send[request_context.request_id] = request_context def send(self, request, for_game=None): """ Send a request. + :param request: request object. :param for_game: (optional) NetworkGame object (required for game requests). :return: a Future that returns the response handler result of this request. + :type request: diplomacy.communication.requests._AbstractRequest + :type for_game: diplomacy.client.network_game.NetworkGame, optional + :rtype: Future """ request_future = Future() request_context = RequestFutureContext(request=request, future=request_future, connection=self, game=for_game) - self.write_request(request_context).add_done_callback(MessageWrittenCallback(request_context).callback) + self.write_request(request_context).add_done_callback(_MessageWrittenCallback(request_context).callback) return gen.with_timeout(timedelta(seconds=constants.REQUEST_TIMEOUT_SECONDS), request_future) -@gen.coroutine -def connect(hostname, port): - """ Connect to given hostname and port. - :param hostname: a hostname - :param port: a port - :return: a Connection object connected. - :rtype: Connection + def write_request(self, request_context): + """ Write a request into internal connection object. + + :param request_context: context of request to send. + :type request_context: RequestFutureContext + """ + future = Future() + request = request_context.request + + def on_message_written(write_future): + """ 3) Writing returned, set future as done (with writing result) + or with writing exception. + """ + exception = write_future.exception() + if exception is not None: + future.set_exception(exception) + else: + future.set_result(write_future.result()) + + def on_connected(reconnected_future): + """ 2) Send request. """ + exception = reconnected_future.exception() + if exception is not None: + LOGGER.error('Fatal (re)connection error occurred while sending a request.') + future.set_exception(exception) + else: + try: + if self.connection is None: + raise WebSocketClosedError() + write_future = self.connection.write_message(request.json()) + except (WebSocketClosedError, StreamClosedError) as exc: + # We were disconnected. + # Save request context as a request to send. + # We will re-try to send it later once reconnected. + self._register_to_send(request_context) + # Transfer exception to returned future. + future.set_exception(exc) + else: + write_future.add_done_callback(on_message_written) + + # 1) Synchronize requests just wait for connection. + # Other requests wait for reconnection (which also implies connection). + if isinstance(request, requests.Synchronize): + self.is_connecting.wait().add_done_callback(on_connected) + else: + self.is_reconnecting.wait().add_done_callback(on_connected) + + return future + +class _Reconnection: + """ Class performing reconnection work for a given connection. + + Class properties: + + - connection: Connection object to reconnect. + - games_phases: dictionary mapping each game address (game ID + game role) to server game info: + {game ID => {game role => responses.DataGamePhase}}. + Server game info is a DataGamePhase response sent by server as response to a Synchronize request. + It contains 3 fields: game ID, current server game phase and current server game timestamp. + We currently use only game phase. + - n_expected_games: number of games registered in games_phases. + - n_synchronized_games: number of games already synchronized. + + Reconnection procedure: + + - Mark all waiting responses as `re-sent` (may be useful on server-side) and + move them back to responses_to_send. + - Remove all previous synchronization requests that are not yet sent. We will send new synchronization + requests with latest games timestamps. Future associated to removed requests will raise an exception. + - Initialize games_phases associating None to each game object currently opened in connection. + - Send synchronization request for each game object currently opened in connection. For each game: + + - server will send a response describing current server game phase (current phase and timestamp). This info + will be used to check local requests to send. Note that concrete synchronization is done via notifications. + Thus, when server responses is received, game synchronization may not be yet terminated, but at least + we will now current server game phase. + - Server response is saved in games_phases (replacing None associated to game object). + - n_synchronized_games is incremented. + + - When sync responses are received for all games registered in games_phases + (n_expected_games == n_synchronized_games), we can finalize reconnection: + + - Remove every phase-dependent game request not yet sent for which phase does not match + server game phase. Futures associated to removed request will raise an exception. + - Finally send all remaining requests. + + These requests may be marked as re-sent. + For these requests, server is (currently) responsible for checking if they don't represent + a duplicated query. """ - connection = Connection(hostname, port) - yield connection.connect() - return connection + + __slots__ = ['connection', 'games_phases', 'n_expected_games', 'n_synchronized_games'] + + def __init__(self, connection): + """ Initialize reconnection data/ + + :param connection: connection to reconnect. + :type connection: Connection + """ + self.connection = connection + self.games_phases = {} + self.n_expected_games = 0 + self.n_synchronized_games = 0 + + def reconnect(self): + """ Perform concrete reconnection work. """ + + # Mark all waiting responses as `re-sent` and move them back to responses_to_send. + for waiting_context in self.connection.requests_waiting_responses.values(): # type: RequestFutureContext + waiting_context.request.re_sent = True + self.connection.requests_to_send.update(self.connection.requests_waiting_responses) + self.connection.requests_waiting_responses.clear() + + # Remove all previous synchronization requests. + requests_to_send_updated = {} + for context in self.connection.requests_to_send.values(): # type: RequestFutureContext + if isinstance(context.request, requests.Synchronize): + context.future.set_exception(exceptions.DiplomacyException( + 'Sync request invalidated for game ID %s.' % context.request.game_id)) + else: + requests_to_send_updated[context.request.request_id] = context + self.connection.requests_to_send = requests_to_send_updated + + # Count games to synchronize. + for channel in self.connection.channels.values(): + for game_instance_set in channel.game_id_to_instances.values(): + for game in game_instance_set.get_games(): + self.games_phases.setdefault(game.game_id, {})[game.role] = None + self.n_expected_games += 1 + + if self.n_expected_games: + # Synchronize games. + for channel in self.connection.channels.values(): + for game_instance_set in channel.game_id_to_instances.values(): + for game in game_instance_set.get_games(): + game.synchronize().add_done_callback(self.generate_sync_callback(game)) + else: + # No game to sync, finish sync now. + self.sync_done() + + def generate_sync_callback(self, game): + """ Generate callback to call when response to sync request is received for given game. + + :param game: game + :return: a callback. + :type game: diplomacy.client.network_game.NetworkGame + """ + + def on_sync(future): + """ Callback. If exception occurs, print it as logging error. Else, register server response, + and move forward to final reconnection work if all games received sync responses. + """ + exception = future.exception() + if exception is not None: + LOGGER.error(str(exception)) + else: + self.games_phases[game.game_id][game.role] = future.result() + self.n_synchronized_games += 1 + if self.n_synchronized_games == self.n_expected_games: + self.sync_done() + + return on_sync + + def sync_done(self): + """ Final reconnection work. Remove obsolete game requests and send remaining requests. """ + + # All sync requests sent have finished. + # Remove all obsolete game requests from connection. + # A game request is obsolete if it's phase-dependent and if its phase does not match current game phase. + + request_to_send_updated = {} + for context in self.connection.requests_to_send.values(): # type: RequestFutureContext + keep = True + if context.request.level == strings.GAME and context.request.phase_dependent: + request_phase = context.request.phase + server_phase = self.games_phases[context.request.game_id][context.request.game_role].phase + if request_phase != server_phase: + # Request is obsolete. + context.future.set_exception(exceptions.DiplomacyException( + 'Game %s: request %s: request phase %s does not match current server game phase %s.' + % (context.request.game_id, context.request.name, request_phase, server_phase))) + keep = False + if keep: + request_to_send_updated[context.request.request_id] = context + + LOGGER.debug('Keep %d/%d old requests to send.', + len(request_to_send_updated), len(self.connection.requests_to_send)) + + # All requests to send are stored in request_to_send_updated. + # Then we can empty connection.requests_to_send. + # If we fail to send a request, it will be re-added again. + self.connection.requests_to_send.clear() + + # Send requests. + for request_to_send in request_to_send_updated.values(): # type: RequestFutureContext + self.connection.write_request(request_to_send).add_done_callback( + _MessageWrittenCallback(request_to_send).callback) + + # We are reconnected. + self.connection.is_reconnecting.set() + + LOGGER.info('Done reconnection work.') + +class _MessageWrittenCallback: + """ Helper class representing callback to call on a connection + when a request is written in a websocket. + """ + __slots__ = ['request_context'] + + def __init__(self, request_context): + """ Initialize the callback object. + + :param request_context: a request context + :type request_context: RequestFutureContext + """ + self.request_context = request_context + + def callback(self, msg_future): + """ Called when request is effectively written on socket, and move the request + from `request to send` to `request assumed sent`. + """ + # Remove request context from `requests to send` in any case. + connection = self.request_context.connection # type: Connection + request_id = self.request_context.request_id + exception = msg_future.exception() + if exception is not None: + if isinstance(exception, (WebSocketClosedError, StreamClosedError)): + # Connection suddenly closed. + # Request context was stored in connection.requests_to_send + # and will be re-sent when reconnection succeeds. + # For more details, see method Connection.write_request(). + LOGGER.error('Connection was closed when sending a request. Silently waiting for a reconnection.') + else: + LOGGER.error('Fatal error occurred while writing a request.') + self.request_context.future.set_exception(exception) + else: + connection.requests_waiting_responses[request_id] = self.request_context |