aboutsummaryrefslogtreecommitdiff
path: root/diplomacy/server/scheduler.py
diff options
context:
space:
mode:
authorPhilip Paquette <pcpaquette@gmail.com>2018-09-26 07:48:55 -0400
committerPhilip Paquette <pcpaquette@gmail.com>2019-04-18 11:14:24 -0400
commit6187faf20384b0c5a4966343b2d4ca47f8b11e45 (patch)
tree151ccd21aea20180432c13fe4b58240d3d9e98b6 /diplomacy/server/scheduler.py
parent96b7e2c03ed98705754f13ae8efa808b948ee3a8 (diff)
Release v1.0.0 - Diplomacy Game Engine - AGPL v3+ License
Diffstat (limited to 'diplomacy/server/scheduler.py')
-rw-r--r--diplomacy/server/scheduler.py265
1 files changed, 265 insertions, 0 deletions
diff --git a/diplomacy/server/scheduler.py b/diplomacy/server/scheduler.py
new file mode 100644
index 0000000..28bee74
--- /dev/null
+++ b/diplomacy/server/scheduler.py
@@ -0,0 +1,265 @@
+# ==============================================================================
+# Copyright (C) 2019 - Philip Paquette, Steven Bocco
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with this program. If not, see <https://www.gnu.org/licenses/>.
+# ==============================================================================
+""" Scheduler used by server to run games.
+
+ Scheduler is configured with a task manager (callback function) and a step time (in seconds)
+ which indicates how long it must wait at each step before checking tasks to process.
+ Then, to add a task, user must specify a data to process and a delay (in number of step times).
+ Deadline is computed using given delay + scheduler step when data was added.
+
+ To set unit as a minute, create Scheduler with unit_in_seconds = 60.
+ In such case, a task with deadline 2 means 2 minutes to wait to process this task.
+ TO set unit as a second, create Scheduler with unit_in_seconds = 1.
+ In such case, a task with deadline 2 means 2 seconds to wait to process this task.
+"""
+from tornado import gen
+from tornado.locks import Lock
+from tornado.queues import Queue
+
+from diplomacy.utils.scheduler_event import SchedulerEvent
+from diplomacy.utils import exceptions
+from diplomacy.utils.priority_dict import PriorityDict
+
+class _Deadline():
+ """ (internal) Deadline value, defined by a start time and a delay, such that deadline = start time + delay. """
+ __slots__ = ['start_time', 'delay']
+
+ def __init__(self, start_time, delay):
+ """ Initialize a deadline with start time and delay, so that deadline = start time + delay.
+ :param start_time: (int)
+ :param delay: (int)
+ """
+ self.start_time = start_time
+ self.delay = delay
+
+ @property
+ def deadline(self):
+ """ Compute and return deadline. """
+ return self.start_time + self.delay
+
+ def __str__(self):
+ return 'Deadline(%d + %d = %d)' % (self.start_time, self.delay, self.deadline)
+
+ def __lt__(self, other):
+ return self.deadline < other.deadline
+
+class _Task():
+ """ (internal) Task class used by scheduler to order scheduled data. It allows auto-rescheduling
+ of a task after it was processed, until either:
+ - task delay is 0.
+ - task manager return a True boolean value (means "data fully processed").
+ - scheduler is explicitly required to remove associated data.
+ """
+ __slots__ = ['data', 'deadline', 'valid']
+
+ def __init__(self, data, deadline):
+ """ Initialize a task.
+ :param data: data to process.
+ :param deadline: Deadline object.
+ :type deadline: _Deadline
+ """
+ self.data = data
+ self.deadline = deadline
+ self.valid = True # Used to ease task removal from Tornado queue.
+
+ def __str__(self):
+ return '%s(%s, %s)' % (self.__class__.__name__, type(self.data).__name__, self.deadline)
+
+ def update_delay(self, new_delay):
+ """ Change deadline delay with given new delay. """
+ self.deadline.delay = new_delay
+
+class _ImmediateTask(_Task):
+ """ (internal) Represents a task intended to be processed as soon as possible the first time,
+ and then scheduled as a normal task for next times. As deadline does not matter for first
+ processing, an immediate task needs a processing validator called the first
+ time to check if it must still be processed. Note that, if validation returns False,
+ the task is not processed the first time and not even added to scheduler for next times.
+ """
+ __slots__ = ['validator']
+
+ def __init__(self, data, future_delay, processing_validator):
+ """ Initialize an immediate task.
+ :param data: data to process.
+ :param future_delay: delay to use to reschedule that task after first processing.
+ :param processing_validator: either a Bool or a callable receiving the data and
+ returning a Bool: processing_validator(data) -> Bool.
+ Validator is used only for the first processing. If evaluated to True, task is
+ processed and then rescheduled for next processing with given future delay.
+ If evaluated to False, task is drop (neither processed nor rescheduled).
+ """
+ super(_ImmediateTask, self).__init__(data, _Deadline(-future_delay, future_delay))
+ if isinstance(processing_validator, bool):
+ self.validator = lambda: processing_validator
+ elif callable(processing_validator):
+ self.validator = lambda: processing_validator(data)
+ else:
+ raise RuntimeError('Validator for immediate task must be either a boolean or a callback(data).')
+
+ def can_still_process(self):
+ """ Return True if this immediate task can still be processed for the first time.
+ If False is returned, task is drop and never processed (not even for a first time).
+ """
+ return self.validator()
+
+ def update_delay(self, new_delay):
+ self.deadline.start_time = -new_delay
+ self.deadline.delay = new_delay
+
+class Scheduler():
+ """ (public) Scheduler class. """
+ __slots__ = ['unit', 'current_time', 'callback_process', 'data_in_queue', 'data_in_heap', 'tasks_queue', 'lock']
+
+ def __init__(self, unit_in_seconds, callback_process):
+ """ Initialize a scheduler.
+ :param unit_in_seconds: number of seconds to wait for each step.
+ :param callback_process: callback to call on every task.
+ Signature:
+ task_callback(task.data) -> bool
+ If callback return True, task is considered done and is removed from scheduler.
+ Otherwise, task is rescheduled for another delay.
+ """
+ assert isinstance(unit_in_seconds, int) and unit_in_seconds > 0
+ assert callable(callback_process)
+ self.unit = unit_in_seconds
+ self.current_time = 0
+ self.callback_process = callback_process
+ self.data_in_heap = PriorityDict() # data => Deadline
+ self.data_in_queue = {} # type: dict{object, _Task} # data => associated Task in queue
+ self.tasks_queue = Queue()
+ # Lock to modify this object safely inside one Tornado thread:
+ # http://www.tornadoweb.org/en/stable/locks.html
+ self.lock = Lock()
+
+ def _enqueue(self, task):
+ """ Put a task in queue of tasks to process now. """
+ self.data_in_queue[task.data] = task
+ self.tasks_queue.put_nowait(task)
+
+ @gen.coroutine
+ def has_data(self, data):
+ """ Return True if given data is associated to any task. """
+ with (yield self.lock.acquire()):
+ return data in self.data_in_heap or data in self.data_in_queue
+
+ @gen.coroutine
+ def get_info(self, data):
+ """ Return info about scheduling for given data, or None if data is not found. """
+ with (yield self.lock.acquire()):
+ deadline = None # type: _Deadline
+ if data in self.data_in_heap:
+ deadline = self.data_in_heap[data]
+ if data in self.data_in_queue:
+ deadline = self.data_in_queue[data].deadline
+ if deadline:
+ return SchedulerEvent(time_unit=self.unit,
+ time_added=deadline.start_time,
+ delay=deadline.delay,
+ current_time=self.current_time)
+ return None
+
+ @gen.coroutine
+ def add_data(self, data, nb_units_to_wait):
+ """ Add data with a non-null deadline. For null deadlines, use no_wait().
+ :param data: data to add
+ :param nb_units_to_wait: time to wait (in number of units)
+ """
+ if not isinstance(nb_units_to_wait, int) or nb_units_to_wait <= 0:
+ raise exceptions.NaturalIntegerNotNullException()
+ with (yield self.lock.acquire()):
+ if data in self.data_in_heap or data in self.data_in_queue:
+ raise exceptions.AlreadyScheduledException()
+ # Add task to scheduler.
+ self.data_in_heap[data] = _Deadline(self.current_time, nb_units_to_wait)
+
+ @gen.coroutine
+ def no_wait(self, data, nb_units_to_wait, processing_validator):
+ """ Add a data to be processed the sooner.
+ :param data: data to add
+ :param nb_units_to_wait: time to wait (in number of units) for data tasks after first task is executed.
+ If null (0), data is processed once (first time) and then dropped.
+ :param processing_validator: validator used to check if data can still be processed for the first time.
+ See documentation of class _ImmediateTask for more details.
+ """
+ if not isinstance(nb_units_to_wait, int) or nb_units_to_wait < 0:
+ raise exceptions.NaturalIntegerException()
+ with (yield self.lock.acquire()):
+ if data in self.data_in_heap:
+ # Move data from heap to queue with new delay.
+ del self.data_in_heap[data]
+ self._enqueue(_ImmediateTask(data, nb_units_to_wait, processing_validator))
+ elif data in self.data_in_queue:
+ # Change delay for future scheduling.
+ self.data_in_queue[data].update_delay(nb_units_to_wait)
+ else:
+ # Add data to queue.
+ self._enqueue(_ImmediateTask(data, nb_units_to_wait, processing_validator))
+
+ @gen.coroutine
+ def remove_data(self, data):
+ """ Remove a data (and all associated tasks) from scheduler. """
+ with (yield self.lock.acquire()):
+ if data in self.data_in_heap:
+ del self.data_in_heap[data]
+ elif data in self.data_in_queue:
+ # Remove task from data_in_queue and invalidate it in queue.
+ self.data_in_queue.pop(data).valid = False
+
+ @gen.coroutine
+ def _step(self):
+ """ Compute a step (check and enqueue tasks to run now) in scheduler. """
+ with (yield self.lock.acquire()):
+ self.current_time += 1
+ while self.data_in_heap:
+ deadline, data = self.data_in_heap.smallest()
+ if deadline.deadline > self.current_time:
+ break
+ del self.data_in_heap[data]
+ self._enqueue(_Task(data, deadline))
+
+ @gen.coroutine
+ def schedule(self):
+ """ Main scheduler method (callback to register in ioloop). Wait for unit seconds and
+ run tasks after each wait time.
+ """
+ while True:
+ yield gen.sleep(self.unit)
+ yield self._step()
+
+ @gen.coroutine
+ def process_tasks(self):
+ """ Main task processing method (callback to register in ioloop). Consume and process tasks in queue
+ and reschedule processed tasks when relevant.
+ A task is processed if associated data was not removed from scheduler.
+ A task is rescheduler if processing callback returns False (True meaning `task definitively done`)
+ AND if task deadline is not null.
+ """
+ while True:
+ task = yield self.tasks_queue.get() # type: _Task
+ try:
+ if task.valid and (not isinstance(task, _ImmediateTask) or task.can_still_process()):
+ if gen.is_coroutine_function(self.callback_process):
+ remove_data = yield self.callback_process(task.data)
+ else:
+ remove_data = self.callback_process(task.data)
+ remove_data = remove_data or not task.deadline.delay
+ with (yield self.lock.acquire()):
+ del self.data_in_queue[task.data]
+ if not remove_data:
+ self.data_in_heap[task.data] = _Deadline(self.current_time, task.deadline.delay)
+ finally:
+ self.tasks_queue.task_done()