1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
# ==============================================================================
# Copyright (C) 2019 - Philip Paquette
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <https://www.gnu.org/licenses/>.
# ==============================================================================
""" Tornado stream wrapper, used internally to abstract a DAIDE stream connection from a WebSocketConnection. """
import logging
from tornado import gen
from tornado.concurrent import Future
from tornado.iostream import StreamClosedError
from diplomacy.daide import notifications, request_managers, responses
from diplomacy.daide.messages import DiplomacyMessage, DaideMessage, ErrorMessage, RepresentationMessage, MessageType
from diplomacy.daide.notification_managers import translate_notification
from diplomacy.daide.requests import RequestBuilder
from diplomacy.daide.utils import bytes_to_str
from diplomacy.utils import exceptions
# Constants
LOGGER = logging.getLogger(__name__)
class ConnectionHandler:
""" ConnectionHandler class.
Properties:
- **server**: server object representing running server.
"""
_NAME_VARIANT_PREFIX = 'DAIDE'
_NAME_VARIANTS_POOL = []
_USED_NAME_VARIANTS = []
def __init__(self):
self.stream = None
self.server = None
self.game_id = None
self.token = None
self._name_variant = None
self._socket_no = None
self._local_addr = ('::1', 0, 0, 0)
self._remote_addr = ('::1', 0, 0, 0)
self.message_mapping = {MessageType.INITIAL: self._on_initial_message,
MessageType.DIPLOMACY: self._on_diplomacy_message,
MessageType.FINAL: self._on_final_message,
MessageType.ERROR: self._on_error_message}
def initialize(self, stream, server, game_id):
""" Initialize the connection handler.
:param server: a Server object.
:type server: diplomacy.Server
"""
self.stream = stream
self.server = server
self.game_id = game_id
stream.set_close_callback(self.on_connection_close)
self._socket_no = self.stream.socket.fileno()
self._local_addr = stream.socket.getsockname()
self._remote_addr = stream.socket.getpeername()
@property
def local_addr(self):
""" Return the address of the local endpoint """
return self._local_addr
@property
def remote_addr(self):
""" Return the address of the remote endpoint """
return self._remote_addr
def get_name_variant(self):
""" Return the address of the remote endpoint """
if self._name_variant is None:
self._name_variant = self._NAME_VARIANTS_POOL.pop(0) if self._NAME_VARIANTS_POOL \
else len(self._USED_NAME_VARIANTS)
self._USED_NAME_VARIANTS.append(self._name_variant)
return self._NAME_VARIANT_PREFIX + str(self._name_variant)
def release_name_variant(self):
""" Return the next available user name variant """
self._USED_NAME_VARIANTS.remove(self._name_variant)
self._NAME_VARIANTS_POOL.append(self._name_variant)
self._name_variant = None
@gen.coroutine
def close_connection(self):
""" Close the connection with the client """
try:
message = DiplomacyMessage()
message.content = bytes(responses.TurnOffResponse())
yield self.write_message(message)
self.stream.close()
except StreamClosedError:
LOGGER.error('Stream is closed.')
def on_connection_close(self):
""" Invoked when the socket is closed (see parent method).
Detach this connection handler from server users.
"""
self.release_name_variant()
self.server.users.remove_connection(self, remove_tokens=False)
LOGGER.info('Removed connection. Remaining %d connection(s).', self.server.users.count_connections())
@gen.coroutine
def read_stream(self):
""" Read the next message from the stream """
messages = []
in_message = yield DaideMessage.from_stream(self.stream)
if in_message and in_message.is_valid:
message_handler = self.message_mapping.get(in_message.message_type, None)
if not message_handler:
raise RuntimeError('Unrecognized DAIDE message type [{}]'.format(in_message.message_type))
if gen.is_coroutine_function(message_handler):
messages = yield message_handler(in_message)
else:
messages = message_handler(in_message)
elif in_message:
err_message = ErrorMessage()
err_message.error_code = in_message.error_code
messages = [err_message]
for message in messages:
yield self.write_message(message)
# Added for compatibility with WebSocketHandler interface
def write_message(self, message, binary=True):
""" Write a message into the stream """
if binary and isinstance(message, bytes):
future = self.stream.write(message)
else:
if isinstance(message, notifications.DaideNotification):
LOGGER.info('[%d] notification:[%s]', self._socket_no, bytes_to_str(bytes(message)))
notification = message
message = DiplomacyMessage()
message.content = bytes(notification)
if isinstance(message, DaideMessage):
future = self.stream.write(bytes(message))
else:
future = Future()
future.set_result(None)
return future
def translate_notification(self, notification):
""" Translate a notification to a DAIDE notification.
:param notification: a notification object to pass to handler function.
See diplomacy.communication.notifications for possible notifications.
:return: either None or an array of daide notifications.
See module diplomacy.daide.notifications for possible daide notifications.
"""
return translate_notification(self.server, notification, self)
def _on_initial_message(self, _):
""" Handle an initial message """
LOGGER.info('[%d] initial message', self._socket_no)
return [RepresentationMessage()]
@gen.coroutine
def _on_diplomacy_message(self, in_message):
""" Handle a diplomacy message """
messages = []
request = RequestBuilder.from_bytes(in_message.content)
try:
LOGGER.info('[%d] request:[%s]', self._socket_no, bytes_to_str(in_message.content))
request.game_id = self.game_id
message_responses = yield request_managers.handle_request(self.server, request, self)
except exceptions.ResponseException:
message_responses = [responses.REJ(bytes(request))]
if message_responses:
for response in message_responses:
response_bytes = bytes(response)
LOGGER.info('[%d] response:[%s]', self._socket_no, bytes_to_str(response_bytes) \
if response_bytes else None)
message = DiplomacyMessage()
message.content = response_bytes
messages.append(message)
return messages
def _on_final_message(self, _):
""" Handle a final message """
LOGGER.info('[%d] final message', self._socket_no)
self.stream.close()
return []
def _on_error_message(self, in_message):
""" Handle an error message """
LOGGER.error('[%d] error [%d]', self._socket_no, in_message.error_code)
return []
|