aboutsummaryrefslogtreecommitdiff
path: root/diplomacy/server/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'diplomacy/server/scheduler.py')
-rw-r--r--diplomacy/server/scheduler.py33
1 files changed, 21 insertions, 12 deletions
diff --git a/diplomacy/server/scheduler.py b/diplomacy/server/scheduler.py
index 28bee74..ce34252 100644
--- a/diplomacy/server/scheduler.py
+++ b/diplomacy/server/scheduler.py
@@ -23,7 +23,7 @@
To set unit as a minute, create Scheduler with unit_in_seconds = 60.
In such case, a task with deadline 2 means 2 minutes to wait to process this task.
- TO set unit as a second, create Scheduler with unit_in_seconds = 1.
+ To set unit as a second, create Scheduler with unit_in_seconds = 1.
In such case, a task with deadline 2 means 2 seconds to wait to process this task.
"""
from tornado import gen
@@ -34,12 +34,13 @@ from diplomacy.utils.scheduler_event import SchedulerEvent
from diplomacy.utils import exceptions
from diplomacy.utils.priority_dict import PriorityDict
-class _Deadline():
+class _Deadline:
""" (internal) Deadline value, defined by a start time and a delay, such that deadline = start time + delay. """
__slots__ = ['start_time', 'delay']
def __init__(self, start_time, delay):
""" Initialize a deadline with start time and delay, so that deadline = start time + delay.
+
:param start_time: (int)
:param delay: (int)
"""
@@ -57,9 +58,10 @@ class _Deadline():
def __lt__(self, other):
return self.deadline < other.deadline
-class _Task():
- """ (internal) Task class used by scheduler to order scheduled data. It allows auto-rescheduling
- of a task after it was processed, until either:
+class _Task:
+ """ (internal) Task class used by scheduler to order scheduled data.
+ It allows auto-rescheduling of a task after it was processed, until either:
+
- task delay is 0.
- task manager return a True boolean value (means "data fully processed").
- scheduler is explicitly required to remove associated data.
@@ -68,6 +70,7 @@ class _Task():
def __init__(self, data, deadline):
""" Initialize a task.
+
:param data: data to process.
:param deadline: Deadline object.
:type deadline: _Deadline
@@ -94,6 +97,7 @@ class _ImmediateTask(_Task):
def __init__(self, data, future_delay, processing_validator):
""" Initialize an immediate task.
+
:param data: data to process.
:param future_delay: delay to use to reschedule that task after first processing.
:param processing_validator: either a Bool or a callable receiving the data and
@@ -120,18 +124,19 @@ class _ImmediateTask(_Task):
self.deadline.start_time = -new_delay
self.deadline.delay = new_delay
-class Scheduler():
+class Scheduler:
""" (public) Scheduler class. """
__slots__ = ['unit', 'current_time', 'callback_process', 'data_in_queue', 'data_in_heap', 'tasks_queue', 'lock']
def __init__(self, unit_in_seconds, callback_process):
""" Initialize a scheduler.
+
:param unit_in_seconds: number of seconds to wait for each step.
:param callback_process: callback to call on every task.
- Signature:
- task_callback(task.data) -> bool
- If callback return True, task is considered done and is removed from scheduler.
- Otherwise, task is rescheduled for another delay.
+
+ - Signature: ``task_callback(task.data) -> bool``
+ - If callback return True, task is considered done and is removed from scheduler.
+ - Otherwise, task is rescheduled for another delay.
"""
assert isinstance(unit_in_seconds, int) and unit_in_seconds > 0
assert callable(callback_process)
@@ -175,6 +180,7 @@ class Scheduler():
@gen.coroutine
def add_data(self, data, nb_units_to_wait):
""" Add data with a non-null deadline. For null deadlines, use no_wait().
+
:param data: data to add
:param nb_units_to_wait: time to wait (in number of units)
"""
@@ -189,6 +195,7 @@ class Scheduler():
@gen.coroutine
def no_wait(self, data, nb_units_to_wait, processing_validator):
""" Add a data to be processed the sooner.
+
:param data: data to add
:param nb_units_to_wait: time to wait (in number of units) for data tasks after first task is executed.
If null (0), data is processed once (first time) and then dropped.
@@ -244,9 +251,11 @@ class Scheduler():
def process_tasks(self):
""" Main task processing method (callback to register in ioloop). Consume and process tasks in queue
and reschedule processed tasks when relevant.
+
A task is processed if associated data was not removed from scheduler.
- A task is rescheduler if processing callback returns False (True meaning `task definitively done`)
- AND if task deadline is not null.
+
+ A task is rescheduled if processing callback returns False
+ (True means `task definitively done`) AND if task deadline is not null.
"""
while True:
task = yield self.tasks_queue.get() # type: _Task