1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
|
# ==============================================================================
# Copyright (C) 2019 - Philip Paquette, Steven Bocco
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <https://www.gnu.org/licenses/>.
# ==============================================================================
""" Server game class. """
from diplomacy.engine.game import Game
from diplomacy.engine.message import GLOBAL, Message, OBSERVER, OMNISCIENT, SYSTEM
from diplomacy.engine.power import Power
from diplomacy.utils import exceptions, parsing, strings
from diplomacy.utils.game_phase_data import GamePhaseData
class ServerGame(Game):
""" ServerGame class.
Properties:
- **server**: (optional) server (Server object) that handles this game.
- **omniscient_usernames** (only for server games):
set of usernames allowed to be omniscient observers for this game.
- **moderator_usernames** (only for server games):
set of usernames allowed to be moderators for this game.
- **observer** (only for server games):
special Power object (diplomacy.Power) used to manage observer tokens.
- **omniscient** (only for server games):
special Power object (diplomacy.Power) used to manage omniscient tokens.
"""
__slots__ = ['server', 'omniscient_usernames', 'moderator_usernames', 'observer', 'omniscient']
model = parsing.update_model(Game.model, {
strings.MODERATOR_USERNAMES: parsing.DefaultValueType(parsing.SequenceType(str, sequence_builder=set), ()),
strings.OBSERVER: parsing.OptionalValueType(parsing.JsonableClassType(Power)),
strings.OMNISCIENT: parsing.OptionalValueType(parsing.JsonableClassType(Power)),
strings.OMNISCIENT_USERNAMES: parsing.DefaultValueType(parsing.SequenceType(str, sequence_builder=set), ()),
})
def __init__(self, server=None, **kwargs):
# Reference to a Server instance.
self.server = server # type: diplomacy.Server
self.omniscient_usernames = None # type: set
self.moderator_usernames = None # type: set
self.observer = None # type: Power
self.omniscient = None # type: Power
super(ServerGame, self).__init__(**kwargs)
assert self.is_server_game()
# Initialize special powers.
self.observer = self.observer or Power(self, name=strings.OBSERVER_TYPE)
self.omniscient = self.omniscient or Power(self, name=strings.OMNISCIENT_TYPE)
self.observer.set_controlled(strings.OBSERVER_TYPE)
self.omniscient.set_controlled(strings.OBSERVER_TYPE)
# Server-only methods.
def get_related_power_names(self, power_name):
""" Return list of power names controlled by the controlled of given power name. """
related_power_names = []
if self.has_power(power_name):
related_power_names = [power_name]
related_power = self.get_power(power_name)
if related_power.is_controlled():
related_power_names = self.get_controlled_power_names(related_power.get_controller())
return related_power_names
def filter_phase_data(self, phase_data, role, is_current):
""" Return a filtered version of given phase data for given gam role.
:param phase_data: GamePhaseData object to filter.
:param role: game role to filter phase data for.
:param is_current: Boolean. Indicate if given phase data is for a current phase (True), or for a pase phase.
:return: a new GamePhaseData object suitable for given game role.
:type phase_data: GamePhaseData
"""
if role == strings.OMNISCIENT_TYPE:
# Nothing to filter.
return phase_data
if role == strings.OBSERVER_TYPE:
# Filter messages.
return GamePhaseData(name=phase_data.name,
state=phase_data.state,
orders=phase_data.orders,
results=phase_data.results,
messages=self.filter_messages(phase_data.messages, role))
# Filter for power roles.
related_power_names = self.get_related_power_names(role)
# Filter messages.
messages = self.filter_messages(phase_data.messages, related_power_names)
# We filter orders only if phase data is for a current phase.
if is_current:
orders = {power_name: phase_data.orders[power_name]
for power_name in related_power_names
if power_name in phase_data.orders}
else:
orders = phase_data.orders
# results don't need to be filtered: it should be provided empty for current phase,
# and it should be kept for a past phase/
return GamePhaseData(name=phase_data.name,
state=phase_data.state,
orders=orders,
messages=messages,
results=phase_data.results)
def game_can_start(self):
""" Return True if server game can start.
A game can start if all followings conditions are satisfied:
- Game has not yet started.
- Game can start automatically (no rule START_MASTER).
- Game has expected number of controlled powers.
:return: a boolean
:rtype: bool
"""
return self.is_game_forming and not self.start_master and self.has_expected_controls_count()
def get_messages(self, game_role, timestamp_from=None, timestamp_to=None):
""" Return a filtered dict of current messages for given output game role.
See method filter_messages() about parameters.
"""
return self.filter_messages(self.messages, game_role, timestamp_from, timestamp_to)
def get_message_history(self, game_role):
""" Return a filtered dict of whole message history for given game role. """
return {str(short_phase): self.filter_messages(messages, game_role)
for short_phase, messages in self.message_history.items()}
def get_user_power_names(self, username):
""" Return list of power names controlled by given user name. """
return [power.name for power in self.powers.values() if power.is_controlled_by(username)]
def new_system_message(self, recipient, body):
""" Create a system message (immediately dated) to be sent by server and add it to message history.
To be used only by server game.
:param recipient: recipient description (string). Either:
- a power name.
- 'GLOBAL' (all game tokens)
- 'OBSERVER' (all special tokens [observers and omniscient observers])
- 'OMNISCIENT' (all omniscient tokens only)
:param body: message body (string).
:return: a new GameMessage object.
:rtype: Message
"""
assert (recipient in {GLOBAL, OBSERVER, OMNISCIENT}
or self.has_power(recipient))
message = Message(phase=self.current_short_phase,
sender=SYSTEM,
recipient=recipient,
message=body)
# Message timestamp will be generated when adding message.
self.add_message(message)
return message
def as_power_game(self, power_name):
""" Return a player game data object copy of this game for given power name. """
for_username = self.get_power(power_name).get_controller()
game = Game.from_dict(self.to_dict())
game.error = []
game.message_history = self.get_message_history(power_name)
game.messages = self.get_messages(power_name)
game.phase_abbr = game.current_short_phase
related_power_names = self.get_related_power_names(power_name)
for power in game.powers.values(): # type: Power
power.role = power.name
power.tokens.clear()
if power.name not in related_power_names:
power.vote = strings.NEUTRAL
power.orders.clear()
game.role = power_name
game.controlled_powers = self.get_controlled_power_names(for_username)
game.observer_level = self.get_observer_level(for_username)
game.daide_port = self.server.get_daide_port(self.game_id) if self.server else None
return game
def as_omniscient_game(self, for_username):
""" Return an omniscient game data object copy of this game. """
game = Game.from_dict(self.to_dict())
game.message_history = self.get_message_history(strings.OMNISCIENT_TYPE)
game.messages = self.get_messages(strings.OMNISCIENT_TYPE)
game.phase_abbr = game.current_short_phase
for power in game.powers.values(): # type: Power
power.role = strings.OMNISCIENT_TYPE
power.tokens.clear()
game.role = strings.OMNISCIENT_TYPE
game.controlled_powers = self.get_controlled_power_names(for_username)
game.observer_level = self.get_observer_level(for_username)
game.daide_port = self.server.get_daide_port(self.game_id) if self.server else None
return game
def as_observer_game(self, for_username):
""" Return an observer game data object copy of this game. """
game = Game.from_dict(self.to_dict())
game.error = []
game.message_history = self.get_message_history(strings.OBSERVER_TYPE)
game.messages = self.get_messages(strings.OBSERVER_TYPE)
game.phase_abbr = game.current_short_phase
for power in game.powers.values(): # type: Power
power.role = strings.OBSERVER_TYPE
power.tokens.clear()
power.vote = strings.NEUTRAL
game.role = strings.OBSERVER_TYPE
game.controlled_powers = self.get_controlled_power_names(for_username)
game.observer_level = self.get_observer_level(for_username)
game.daide_port = self.server.get_daide_port(self.game_id) if self.server else None
return game
def cast(self, role, for_username):
""" Return a copy of this game for given role
(either observer role, omniscient role or a power role).
"""
assert strings.role_is_special(role) or self.has_power(role)
if role == strings.OBSERVER_TYPE:
return self.as_observer_game(for_username)
if role == strings.OMNISCIENT_TYPE:
return self.as_omniscient_game(for_username)
return self.as_power_game(role)
def is_controlled_by(self, power_name, username):
""" (for server game) Return True if given power name is controlled by given username. """
return self.get_power(power_name).is_controlled_by(username)
def get_observer_level(self, username):
""" Return the highest observation level allowed for given username.
:param username: name of user to get observation right
:return: either 'master_type', 'omniscient_type', 'observer_type' or None.
"""
if (self.server and self.server.users.has_admin(username)) or self.is_moderator(username):
return strings.MASTER_TYPE
if self.is_omniscient(username):
return strings.OMNISCIENT_TYPE
if not self.no_observations:
return strings.OBSERVER_TYPE
return None
def get_reception_addresses(self):
""" Generate addresses (couple [power name, token]) of all users implied in this game. """
for power in self.powers.values(): # type: Power
for token in power.tokens:
yield (power.name, token)
for token in self.observer.tokens:
yield (self.observer.name, token)
for token in self.omniscient.tokens:
yield (self.omniscient.name, token)
def get_special_addresses(self):
""" Generate addresses (couples [power name, token]) of
omniscient observers and simple observers of this game.
"""
for power in (self.omniscient, self.observer):
for token in power.tokens:
yield (power.name, token)
def get_observer_addresses(self):
""" Generate addresses (couples [power name, token]) of observers of this game. """
for token in self.observer.tokens:
yield (self.observer.name, token)
def get_omniscient_addresses(self):
""" Generate addresses (couples [power name, token])
of omniscient observers of this game.
"""
for token in self.omniscient.tokens:
yield (self.omniscient.name, token)
def get_special_token_role(self, token):
""" Return role name (either OBSERVER_TYPE or OMNISCIENT_TYPE) for given special token. """
if self.has_omniscient_token(token):
return strings.OMNISCIENT_TYPE
if self.has_observer_token(token):
return strings.OBSERVER_TYPE
raise exceptions.DiplomacyException('Unknown special token in game %s' % self.game_id)
def get_power_addresses(self, power_name):
""" Generate addresses (couples [power name, token])
of user controlling given power name.
"""
for token in self.get_power(power_name).tokens:
yield (power_name, token)
def has_player(self, username):
""" (for server game) Return True if given username controls any map power. """
return any(power.is_controlled_by(username) for power in self.powers.values())
def has_token(self, token):
""" Return True if game has given token (either observer, omniscient or player). """
return self.omniscient.has_token(token) or self.observer.has_token(token) or any(
power.has_token(token) for power in self.powers.values())
def has_observer_token(self, token):
""" Return True if game has given observer token. """
return self.observer.has_token(token)
def has_omniscient_token(self, token):
""" Return True if game has given omniscient observer token. """
return self.omniscient.has_token(token)
def has_player_token(self, token):
""" Return True if game has given player token. """
return any(power.has_token(token) for power in self.powers.values())
def power_has_token(self, power_name, token):
""" Return True if given power has given player token.
:param power_name: name of power to check.
:param token: token to look for.
:return: a boolean
"""
return self.get_power(power_name).has_token(token)
def add_omniscient_token(self, token):
""" Set given token as omniscient token. """
if self.observer.has_token(token):
raise exceptions.ResponseException('Token already registered as observer.')
if self.has_player_token(token):
raise exceptions.ResponseException('Token already registered as player.')
self.omniscient.add_token(token)
def add_observer_token(self, token):
""" Set given token as observer token. """
if self.omniscient.has_token(token):
raise exceptions.ResponseException('Token already registered as omniscient.')
if self.has_player_token(token):
raise exceptions.ResponseException('Token already registered as player.')
self.observer.add_token(token)
def transfer_special_token(self, token):
""" Move given token from a special case to another
(observer -> omniscient or omniscient -> observer).
"""
if self.has_observer_token(token):
self.remove_observer_token(token)
self.add_omniscient_token(token)
elif self.has_omniscient_token(token):
self.remove_omniscient_token(token)
self.add_observer_token(token)
def control(self, power_name, username, token):
""" Control given power name with given username via given token. """
if self.observer.has_token(token):
raise exceptions.ResponseException('Token already registered as observer.')
if self.omniscient.has_token(token):
raise exceptions.ResponseException('Token already registered as omniscient.')
power = self.get_power(power_name) # type: Power
if power.is_controlled() and not power.is_controlled_by(username):
raise exceptions.ResponseException('Power already controlled by another user.')
power.set_controlled(username)
power.add_token(token)
def remove_observer_token(self, token):
""" Remove given observer token. """
self.observer.remove_tokens([token])
def remove_omniscient_token(self, token):
""" Remove given omniscient token. """
self.omniscient.remove_tokens([token])
def remove_special_token(self, special_name, token):
""" Remove given token from given special power name
(either __OBSERVER__ or __OMNISCIENT__).
"""
if special_name == self.observer.name:
self.remove_observer_token(token)
else:
assert special_name == self.omniscient.name
self.remove_omniscient_token(token)
def remove_all_tokens(self):
""" Remove all connected tokens from this game. """
self.observer.tokens.clear()
self.omniscient.tokens.clear()
for power in self.powers.values():
power.tokens.clear()
def remove_token(self, token):
""" Remove token from this game. """
for power in self.powers.values(): # type: Power
power.remove_tokens([token])
for special_power in (self.observer, self.omniscient):
special_power.remove_tokens([token])
def is_moderator(self, username):
""" Return True if given username is a moderator of this game. """
return username in self.moderator_usernames
def is_omniscient(self, username):
""" Return True if given username is allowed to be an omniscient observer of this game. """
return username in self.omniscient_usernames
def promote_moderator(self, username):
""" Allow given username to be a moderator of this game. """
self.moderator_usernames.add(username)
def promote_omniscient(self, username):
""" Allow given username to be an omniscient observer of this game. """
self.omniscient_usernames.add(username)
def demote_moderator(self, username):
""" Remove given username from allowed moderators. """
if username in self.moderator_usernames:
self.moderator_usernames.remove(username)
def demote_omniscient(self, username):
""" Remove given username from allowed omniscient observers. """
if username in self.omniscient_usernames:
self.omniscient_usernames.remove(username)
def filter_usernames(self, filter_function):
""" Remove each omniscient username, moderator username and player controller
that does not match given filter function (if filter_function(username) is False).
:param filter_function: a callable receiving a username and returning a boolean.
:return: an integer, either:
* 0: nothing changed.
* -1: something changed, but no player controllers removed.
* 1: something changed, and some player controllers were removed.
So, if 1 is returned, there are new dummy powers in the game
(some notifications may need to be sent).
"""
n_kicked_players = 0
n_kicked_omniscients = len(self.omniscient_usernames)
n_kicked_moderators = len(self.moderator_usernames)
self.omniscient_usernames = set(username for username in self.omniscient_usernames if filter_function(username))
self.moderator_usernames = set(username for username in self.moderator_usernames if filter_function(username))
for power in self.powers.values():
if power.is_controlled() and not filter_function(power.get_controller()):
power.set_controlled(None)
n_kicked_players += 1
n_kicked_omniscients -= len(self.omniscient_usernames)
n_kicked_moderators -= len(self.moderator_usernames)
if n_kicked_players:
return 1
if n_kicked_moderators or n_kicked_omniscients:
return -1
return 0
def filter_tokens(self, filter_function):
""" Remove from this game any token not matching given filter function
(if filter_function(token) is False).
"""
self.observer.remove_tokens([token for token in self.observer.tokens if not filter_function(token)])
self.omniscient.remove_tokens([token for token in self.omniscient.tokens if not filter_function(token)])
for power in self.powers.values(): # type: Power
power.remove_tokens([token for token in power.tokens if not filter_function(token)])
def process(self):
""" Process current game phase and move forward to next phase.
:return: a triple containing:
- previous game state (before the processing)
- current game state (after processing and game updates)
- A dictionary mapping kicked power names to tokens previously associated to these powers.
Useful to notify kicked users as they will be not registered in game anymore.
If game was not active, triple is (None, None, None).
If game kicked powers, only kicked powers dict is returned: (None, None, kicked powers).
If game was correctly processed, only states are returned: (prev, curr, None).
"""
if not self.is_game_active:
return None, None, None
# Kick powers if necessary.
all_orderable_locations = self.get_orderable_locations()
kicked_powers = {}
for power in self.powers.values():
if (power.is_controlled()
and not power.order_is_set
and not self.civil_disorder
and all_orderable_locations[power.name]):
# This controlled power has not submitted orders, we have not rule CIVIL_DISORDER,
# and this power WAS allowed to submit orders for this phase.
# We kick such power.
kicked_powers[power.name] = set(power.tokens)
power.set_controlled(None)
if kicked_powers:
# Some powers were kicked from an active game before processing.
# This game must be stopped and cannot be processed. We return info about kicked powers.
self.set_status(strings.FORMING)
return None, None, kicked_powers
# Process game and retrieve previous state.
previous_phase_data = super(ServerGame, self).process()
if self.count_controlled_powers() < self.get_expected_controls_count():
# There is no more enough controlled powers, we should stop game.
self.set_status(strings.FORMING)
# Return process results: previous phase data, current phase data, and None for no kicked powers.
return previous_phase_data, self.get_phase_data(), None
|