aboutsummaryrefslogtreecommitdiff
path: root/diplomacy/server/users.py
blob: d63df3ec438ea29e4d7071c22874fb8f060d27cb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# ==============================================================================
# Copyright (C) 2019 - Philip Paquette, Steven Bocco
#
#  This program is free software: you can redistribute it and/or modify it under
#  the terms of the GNU Affero General Public License as published by the Free
#  Software Foundation, either version 3 of the License, or (at your option) any
#  later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
#  FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
#  details.
#
#  You should have received a copy of the GNU Affero General Public License along
#  with this program.  If not, see <https://www.gnu.org/licenses/>.
# ==============================================================================
""" Helper class to manage user accounts and connections  on server side.

    A user is associated to 0 or more connected tokens,
    and each connected token is associated to at most 1 connection handler.

    When a connection handler is closed or invalidated,
    related tokens are kept and may be further associated to new connection handlers.

    Tokens are effectively deleted when they expire after TOKEN_LIFETIME_SECONDS seconds since last token usage.
"""
import logging

from diplomacy.server.user import User
from diplomacy.utils import common, parsing, strings
from diplomacy.utils.common import generate_token
from diplomacy.utils.jsonable import Jsonable

LOGGER = logging.getLogger(__name__)

# Token lifetime in seconds: default 24hours.
TOKEN_LIFETIME_SECONDS = 24 * 60 * 60

class Users(Jsonable):
    """ Users class. Properties:
        - users: dictionary mapping usernames to User object.s
        - administrators: set of administrator usernames.
        - token_timestamp: dictionary mapping each token to its creation/last confirmation timestamp.
        - token_to_username: dictionary mapping each token to its username.
        - username_to_tokens: dictionary mapping each username to a set of its tokens.
        - token_to_connection_handler: (memory only) dictionary mapping each token to a connection handler
        - connection_handler_to_tokens (memory only) dictionary mapping a connection handler to a set of its tokens
    """
    __slots__ = ['users', 'administrators', 'token_timestamp', 'token_to_username', 'username_to_tokens',
                 'token_to_connection_handler', 'connection_handler_to_tokens']
    model = {
        strings.USERS: parsing.DefaultValueType(parsing.DictType(str, parsing.JsonableClassType(User)), {}),
        # {username => User}
        strings.ADMINISTRATORS: parsing.DefaultValueType(parsing.SequenceType(str, sequence_builder=set), ()),
        # {usernames}
        strings.TOKEN_TIMESTAMP: parsing.DefaultValueType(parsing.DictType(str, int), {}),
        strings.TOKEN_TO_USERNAME: parsing.DefaultValueType(parsing.DictType(str, str), {}),
        strings.USERNAME_TO_TOKENS: parsing.DefaultValueType(parsing.DictType(str, parsing.SequenceType(str, set)), {}),
    }

    def __init__(self, **kwargs):
        self.users = {}
        self.administrators = set()
        self.token_timestamp = {}
        self.token_to_username = {}
        self.username_to_tokens = {}
        self.token_to_connection_handler = {}
        self.connection_handler_to_tokens = {}
        super(Users, self).__init__(**kwargs)

    def has_username(self, username):
        """ Return True if users have given username. """
        return username in self.users

    def has_user(self, username, password):
        """ Return True if users have given username with given password. """
        return username in self.users and self.users[username].is_valid_password(password)

    def has_admin(self, username):
        """ Return True if given username is an administrator. """
        return username in self.administrators

    def has_token(self, token):
        """ Return True if users have given token. """
        return token in self.token_to_username

    def token_is_alive(self, token):
        """ Return True if given token is known and still alive.
            A token is alive if elapsed time since last token usage does not exceed token lifetime
            (TOKEN_LIFETIME_SECONDS).
        """
        if self.has_token(token):
            current_time = common.timestamp_microseconds()
            elapsed_time_seconds = (current_time - self.token_timestamp[token]) / 1000000
            return elapsed_time_seconds <= TOKEN_LIFETIME_SECONDS
        return False

    def relaunch_token(self, token):
        """ Update timestamp  of given token with current timestamp. """
        if self.has_token(token):
            self.token_timestamp[token] = common.timestamp_microseconds()

    def token_is_admin(self, token):
        """ Return True if given token is associated to an administrator. """
        return self.has_token(token) and self.has_admin(self.get_name(token))

    def count_connections(self):
        """ Return number of registered connection handlers. """
        return len(self.connection_handler_to_tokens)

    def get_tokens(self, username):
        """ Return a sequence of tokens associated to given username. """
        return self.username_to_tokens[username].copy()

    def get_name(self, token):
        """ Return username of given token. """
        return self.token_to_username[token]

    def get_user(self, username):
        """ Returns user linked to username """
        return self.users.get(username, None)

    def get_connection_handler(self, token):
        """ Return connection handler associated to given token, or None if no handler currently associated. """
        return self.token_to_connection_handler.get(token, None)

    def add_admin(self, username):
        """ Set given username as administrator. Related user must exists in this Users object. """
        assert username in self.users
        self.administrators.add(username)

    def remove_admin(self, username):
        """ Remove given username from administrators. """
        if username in self.administrators:
            self.administrators.remove(username)

    def create_token(self):
        """ Return a new token guaranteed to not exist in this Users object. """
        token = generate_token()
        while self.has_token(token):
            token = generate_token()
        return token

    def add_user(self, username, password_hash):
        """ Add a new user with given username and hashed password.
            See diplomacy.utils.common.hash_password() for hashing purposes.
        """
        user = User(username=username, password_hash=password_hash)
        self.users[username] = user
        return user

    def replace_user(self, username, new_user):
        """ Replaces user object with a new user """
        self.users[username] = new_user

    def remove_user(self, username):
        """ Remove user related to given username. """
        user = self.users.pop(username)
        self.remove_admin(username)
        for token in self.username_to_tokens.pop(user.username):
            self.token_timestamp.pop(token)
            self.token_to_username.pop(token)
            connection_handler = self.token_to_connection_handler.pop(token, None)
            if connection_handler:
                self.connection_handler_to_tokens[connection_handler].remove(token)
                if not self.connection_handler_to_tokens[connection_handler]:
                    self.connection_handler_to_tokens.pop(connection_handler)

    def remove_connection(self, connection_handler, remove_tokens=True):
        """ Remove given connection handler.
            Return tokens associated to this connection handler,
            or None if connection handler is unknown.
            :param connection_handler: connection handler to remove.
            :param remove_tokens: if True, tokens related to connection handler are deleted.
            :return: either None or a set of tokens.
        """
        if connection_handler in self.connection_handler_to_tokens:
            tokens = self.connection_handler_to_tokens.pop(connection_handler)
            for token in tokens:
                self.token_to_connection_handler.pop(token)
                if remove_tokens:
                    self.token_timestamp.pop(token)
                    user = self.users[self.token_to_username.pop(token)]
                    self.username_to_tokens[user.username].remove(token)
                    if not self.username_to_tokens[user.username]:
                        self.username_to_tokens.pop(user.username)
            return tokens
        return None

    def connect_user(self, username, connection_handler):
        """ Connect given username to given connection handler with a new generated token, and return
            token generated.
            :param username: username to connect
            :param connection_handler: connection handler to link to user
            :return: a new token generated for connexion
        """
        token = self.create_token()
        user = self.users[username]
        if connection_handler not in self.connection_handler_to_tokens:
            self.connection_handler_to_tokens[connection_handler] = set()
        if user.username not in self.username_to_tokens:
            self.username_to_tokens[user.username] = set()
        self.token_to_username[token] = user.username
        self.token_to_connection_handler[token] = connection_handler
        self.username_to_tokens[user.username].add(token)
        self.connection_handler_to_tokens[connection_handler].add(token)
        self.token_timestamp[token] = common.timestamp_microseconds()
        return token

    def attach_connection_handler(self, token, connection_handler):
        """ Associate given token with given connection handler if token is known.
            If there is a previous connection handler associated to given token, it should be the same
            as given connection handler, otherwise an error is raised (meaning previous connection handler
            was not correctly disconnected from given token. It should be a programming error).
            :param token: token
            :param connection_handler: connection handler
        """
        if self.has_token(token):
            previous_connection = self.get_connection_handler(token)
            if previous_connection:
                assert previous_connection == connection_handler, \
                    "A new connection handler cannot be attached to a token always connected to another handler."
            else:
                LOGGER.warning('Attaching a new connection handler to a token.')
                if connection_handler not in self.connection_handler_to_tokens:
                    self.connection_handler_to_tokens[connection_handler] = set()
                self.token_to_connection_handler[token] = connection_handler
                self.connection_handler_to_tokens[connection_handler].add(token)
                self.token_timestamp[token] = common.timestamp_microseconds()

    def disconnect_token(self, token):
        """ Remove given token. """
        self.token_timestamp.pop(token)
        user = self.users[self.token_to_username.pop(token)]
        self.username_to_tokens[user.username].remove(token)
        if not self.username_to_tokens[user.username]:
            self.username_to_tokens.pop(user.username)
        connection_handler = self.token_to_connection_handler.pop(token, None)
        if connection_handler:
            self.connection_handler_to_tokens[connection_handler].remove(token)
            if not self.connection_handler_to_tokens[connection_handler]:
                self.connection_handler_to_tokens.pop(connection_handler)