Python实现线程池 作者:xieaotian发表于2010-01-24 09:32:35
前言:
关于线程池(thread pool)的概念请参考http://en.wikipedia.org/wiki/Thread_pool_pattern。在Python中使用线程是有硬伤的,因为Python(这里指C语言实现的Python)的基本调用都最后生成对应C语言的函数调用,因此Python中使用线程的开销太大,不过可以使用Stackless Python(Python的一个修改版)来增强Python中使用线程的表现。
同时由于Python中GIL的存在,导制在使用多CPU时Python无法充分利用多个CPU,目前pysco这个模块可以针对多CPU提高Python的效率。
在C语言里要实现个线程池,就要面对一堆的指针,还有pthread这个库中那些看起来很让人头痛的一些函数:
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void*), void *restrict arg);
而如果用Python来实现一个线程池的话就好多了,不仅结构十分清晰,而且代码看起来会很优美:[code] 1.
import threading
2.
from time import sleep
3.
4.
class ThreadPool:
5.
6.
"""Flexible thread pool class. Creates a pool of threads, then
7.
accepts tasks that will be dispatched to the next available
8.
thread."""
9.
10.
def __init__(self, numThreads):
11.
12.
"""Initialize the thread pool with numThreads workers."""
13.
14.
self.__threads = []
15.
self.__resizeLock = threading.Condition(threading.Lock())
16.
self.__taskLock = threading.Condition(threading.Lock())
17.
self.__tasks = []
18.
self.__isJoining = False
19.
self.setThreadCount(numThreads)
20.
21.
def setThreadCount(self, newNumThreads):
22.
23.
""" External method to set the current pool size. Acquires
24.
the resizing lock, then calls the internal version to do real
25.
work."""
26.
27.
Can't change the thread count if we're shutting down the pool!
28.
if self.__isJoining:
29.
return False
30.
31.
self.__resizeLock.acquire()
32.
try:
33.
self.__setThreadCountNolock(newNumThreads)
34.
finally:
35.
self.__resizeLock.release()
36.
return True
37.
38.
def __setThreadCountNolock(self, newNumThreads):
39.
40.
"""Set the current pool size, spawning or terminating threads
41.
if necessary. Internal use only; assumes the resizing lock is
42.
held."""
43.
44.
If we need to grow the pool, do so
45.
while newNumThreads > len(self.__threads):
46.
newThread = ThreadPoolThread(self)
47.
self.__threads.append(newThread)
48.
newThread.start()
49.
If we need to shrink the pool, do so
50.
while newNumThreads51.
self.__threads[0].goAway()
52.
del self.__threads[0]
53.
54.
def getThreadCount(self):
55.
56.
"""Return the number of threads in the pool."""
57.
58.
self.__resizeLock.acquire()
59.
try:
60.
return len(self.__threads)
61.
finally:
62.
self.__resizeLock.release()
63.
64.
def queueTask(self, task, args=None, taskCallback=None):
65.
66.
"""Insert a task into the queue. task must be callable;
67.
args and taskCallback can be None."""
68.
69.
if self.__isJoining == True:
70.
return False
71.
if not callable(task):
72.
return False
73.
74.
self.__taskLock.acquire()
75.
try:
76.
self.__tasks.append((task, args, taskCallback))
77.
return True
78.
finally:
79.
self.__taskLock.release()
80.
81.
def getNextTask(self):
82.
83.
""" Retrieve the next task from the task queue. For use
84.
only by ThreadPoolThread objects contained in the pool."""
85.
86.
self.__taskLock.acquire()
87.
try:
88.
if self.__tasks == []:
89.
return (None, None, None)
90.
else:
91.
return self.__tasks.pop(0)
92.
finally:
93.
self.__taskLock.release()
94.
95.
def joinAll(self, waitForTasks = True, waitForThreads = True):
96.
97.
""" Clear the task queue and terminate all pooled threads,
98.
optionally allowing the tasks and threads to finish."""
99.
100.
Mark the pool as joining to prevent any more task queueing
101.
self.__isJoining = True
102.
103.
Wait for tasks to finish
104.
if waitForTasks:
105.
while self.__tasks != []:
106.
sleep(.1)
107.
108.
Tell all the threads to quit
109.
self.__resizeLock.acquire()
110.
try:
111.
self.__setThreadCountNolock(0)
112.
self.__isJoining = True
113.
114.
Wait until all threads have exited
115.
if waitForThreads:
116.
for t in self.__threads:
117.
t.join()
118.
del t
119.
120.
Reset the pool for potential reuse
121.
self.__isJoining = False
122.
finally:
123.
self.__resizeLock.release()
124.
125.
class ThreadPoolThread(threading.Thread):
126.
""" Pooled thread class. """
127.
128.
threadSleepTime = 0.1
129.
130.
def __init__(self, pool):
131.
132.
""" Initialize the thread and remember the pool. """
133.
134.
threading.Thread.__init__(self)
135.
self.__pool = pool
136.
self.__isDying = False
137.
138.
def run(self):
139.
140.
""" Until told to quit, retrieve the next task and execute
141.
it, calling the callback if any. """
142.
143.
while self.__isDying == False:
144.
cmd, args, callback = self.__pool.getNextTask()
145.
If there's nothing to do, just sleep a bit
146.
if cmd is None:
147.
sleep(ThreadPoolThread.threadSleepTime)
148.
elif callback is None:
149.
cmd(args)
150.
else:
151.
callback(cmd(args))
152.
153.
def goAway(self):
154.
155.
""" Exit the run loop next time through."""
156.
157.
self.__isDying = True
[/code]这段100多行的代码完成了一个可动态改变的线程池,并且包含了详细的注释,这里是代码的出处。我觉得这段代码比Python官方给出的那个还要好些。他们实现的原理都是一样的,使用了一个队列(Queue)来存储任务。
关于Python中线程同步的问题,这里有不错的介绍。
