forked from Tribler/dispersy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtaskmanager.py
100 lines (82 loc) · 3.32 KB
/
taskmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from threading import Lock
from twisted.internet.base import DelayedCall
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
from .util import blocking_call_on_reactor_thread
CLEANUP_FREQUENCY = 100
class TaskManager(object):
"""
Provides a set of tools to mantain a list of twisted "tasks" (Deferred, LoopingCall, DelayedCall) that are to be
executed during the lifetime of an arbitrary object, usually getting killed with it.
"""
def __init__(self):
self._pending_tasks = {}
self._cleanup_counter = CLEANUP_FREQUENCY
self._task_lock = Lock()
def replace_task(self, name, task):
"""
Replace named task with the new one, cancelling the old one in the process.
"""
self.cancel_pending_task(name)
return self.register_task(name, task)
def register_task(self, name, task):
"""
Register a task so it can be canceled at shutdown time or by name.
"""
assert not self.is_pending_task_active(name), name
assert isinstance(task, (Deferred, DelayedCall, LoopingCall)), task
self._maybe_clean_task_list()
with self._task_lock:
self._pending_tasks[name] = task
return task
@blocking_call_on_reactor_thread
def cancel_pending_task(self, name):
"""
Cancels the named task
"""
self._maybe_clean_task_list()
is_active, stopfn = self._get_isactive_stopper(name)
if is_active and stopfn:
stopfn()
self._pending_tasks.pop(name)
def cancel_all_pending_tasks(self):
"""
Cancels all the registered tasks.
This usually should be called when stopping or destroying the object so no tasks are left floating around.
"""
assert all([isinstance(task, (Deferred, DelayedCall, LoopingCall))
for task in self._pending_tasks.itervalues()]), self._pending_tasks
for name in self._pending_tasks.keys():
self.cancel_pending_task(name)
def is_pending_task_active(self, name):
"""
Return a boolean determining if a task is active.
"""
return self._get_isactive_stopper(name)[0]
def _get_isactive_stopper(self, name):
"""
Return a boolean determining if a task is active and its cancel/stop method if the task is registered.
"""
task = self._pending_tasks.get(name, None)
if isinstance(task, Deferred):
# Have in mind that any deferred in the pending tasks list should have been constructed with a
# canceller function.
return not task.called, getattr(task, 'cancel', None)
elif isinstance(task, DelayedCall):
return task.active(), task.cancel
elif isinstance(task, LoopingCall):
return task.running, task.stop
else:
return False, None
def _maybe_clean_task_list(self):
"""
Removes finished tasks from the task list.
"""
if self._cleanup_counter:
self._cleanup_counter -= 1
else:
self._cleaup_counter = CLEANUP_FREQUENCY
for name in self._pending_tasks.keys():
if not self.is_pending_task_active(name):
self._pending_tasks.pop(name)
__all__ = ["TaskManager"]