-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
126 lines (104 loc) · 3.48 KB
/
worker.py
File metadata and controls
126 lines (104 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import threading
import sys
from concurrent import tasklib
__author__ = 'patrick'
class UnavailableError(Exception):
pass
class FutureResult(object):
def __init__(self):
self._evt = threading.Event()
self._cancelled = False
self._callback = None
self._callback_lock = threading.lock()
def cancel(self):
with self._callback_lock:
self._cancelled = True
self._evt.set()
if self._callback:
self._callback(UnavailableError("cancelled"))
def set_callback(self, callback):
with self._callback_lock:
self._callback = callback
if self._evt.is_set():
if hasattr(self, "_value"):
self._callback(self._value)
elif hasattr(self, "_exc"):
self._callback(self.exc[1])
elif self._cancelled:
self._callback(UnavailableError("cancelled"))
def set(self, value):
with self._callback_lock:
self._value = value
self._evt.set()
if self._callback:
self._callback(value)
def get(self):
self._evt.wait()
if hasattr(self, "_exc"):
raise self._exc[1].with_traceback(self._exc[2])
elif hasattr(self, "_value"):
return self._value
elif self._cancelled:
raise UnavailableError("Cancelled")
else:
raise UnavailableError("No result")
class WorkerTask(tasklib.Task):
def apply(self, func, args=(), kwargs={}):
fresult = FutureResult()
self.send((fresult, func, args, kwargs))
return fresult
def run(self):
while True:
fresult, func, args, kwargs = self.recv()
if fresult._cancelled:
continue
try:
fresult.set(func(*args, **kwargs))
except:
fresult.set_error()
class WorkerPool(tasklib.Task):
def __init__(self, nworkers=1):
super(WorkerPool, self).__init__(name="workerpool")
self.nworkers = nworkers
def apply(self, func, args=(), kwargs={}):
fresult = FutureResult()
self.send((fresult, func, args, kwargs))
return fresult
def run(self):
self._running_workers = self.nworkers
self._all_done = threading.Event()
# Launch additional worker threads
for n in range(1, self.nworkers):
thr = threading.Thread(target=self.do_work)
thr.daemon = True
thr.start()
self.do_work()
# wait for all workers to terminate
self._all_done.wait()
# Worker method (runs in multiple threads)
def do_work(self):
try:
while True:
fresult, func, args, kwargs = self.recv()
if fresult._cancelled:
continue
try:
fresult.set(func(*args, **kwargs))
except:
fresult.set_error()
except tasklib.TaskExit:
pass
finally:
self._running_workers -= 1
if self._running_workers:
# Request the next worker to stop
self.stop()
else:
self._all_done.set()
self.log.info("Worker thread stopped")
# For testing/debugging
if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.INFO)
pool = WorkerPool(nworkers=8)
pool.start()