Python's Archiver

為方便港臺同胞閱覽,Python中國特別推出簡繁體內容轉換功能

xieaotian 发表于 2008-11-17 11:31

Python 线程 初探(2)

继承threading.Thread,来实现多线程

在原来应用的基础上,服务器端代码改用Threading,来实现进程管理,并且,在控制进程同步上,采用了一些新的方法。

服务器段:

import socket
import sys

import threading

class srvr(threading.Thread):
    v = ''
    vlock = threading.Lock()
    id = 0 #id for the next instance.
    def __init__(self, clntsock):
        #invoke constructor of the parent class
        threading.Thread.__init__(self)
        #self.xx instance members
        self.myid = srvr.id
        srvr.id += 1
        self.myclntsock = clntsock
    def run(self):
        while 1:
            #receive letter from client.
            k = self.myclntsock.recv(1)
            if k == '':
                break
            #update the v string
            srvr.vlock.acquire()
            srvr.v += k
            srvr.vlock.release()
            #send v back to client
            self.myclntsock.send(srvr.v)
        self.myclntsock.close()

lstn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
port = int(sys.argv[1])

lstn.bind(('',port))
lstn.listen(5)

nclnt = int(sys.argv[2]) #number of the client.

mythreads = [] #list of all the threads

for i in range(nclnt):
    (clnt,ap) = lstn.accept()
    #make new srvr class instance:
    s = srvr(clnt)
    mythreads.append(s)#update the list.
    s.start()

lstn.close()

#wait for all the threads complete.
for s in mythreads:
    s.join()
           
print 'the finial v is :', srvr.v
   

------------code ends here---------------

class srvr(threading.Thread) 表示继承了threading.Thread类,我们只用重载其中一些方法即可实现多线程。on help(threading.Thread):

class Thread(_Verbose)
| Method resolution order:
|      Thread
|      _Verbose
|      __builtin__.object
|
| Methods defined here:
|
| __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None)   
         (构造函数)

| run(self) (线程函数)

| start(self) (启动线程)

| join(self, timeout=None) (等待self线程 结束,知道grant图么?不难理解为什么叫Join了吧)

其中:on start():

    def start(self):
        assert self.__initialized, "Thread.__init__() not called" #断言self被初始化
        assert not self.__started, "thread already started"#断言进程尚未启动
        if __debug__:
            self._note("%s.start(): starting thread", self)
        _active_limbo_lock.acquire()
        _limbo[self] = self
        _active_limbo_lock.release()
        _start_new_thread(self.__bootstrap, ())
        self.__started = True
        _sleep(0.000001)    # 1 usec, to let the thread run (Solaris hack) #可以看出,对于不同操作系统,Python的多线程实现方式是不同的,并不是纯自身模拟多线程特性,而也是借助了一些操作系统的特性.

对比之前我们的服务器代码,原来的代码里,为了实现对由主程序启动的线程的控制,我们使用了信号量nclnt, 启动新线程时nclnt+1,线程结束时nclnt-1,当nclnt最终成为0的时候,服务器段程序就可以关闭自己了。在新的代码里,我们取消了这个信号量,改用 :

#wait for all the threads complete.
for s in mythreads:
    s.join()

把代码翻译成自然语言:对于每一个已经开始的线程,让它加入到这个的线程里来。s.join就是说等待s线程的工作完成,然后join()就不再继续阻塞调用它的线程了,继续下面的工作。

Threading是如何实现的呢?

在threading.py里,我发现了大段的: class _Semaphore(_Verbose),还有:

class _BoundedSemaphore(_Semaphore):
    """Semaphore that checks that # releases is <= # acquires"""

可见程序中依旧使用了信号量(Semaphore),但不能十分确定,信号量就是为了join准备的。理论上,内部的信号量不该帮助到启动它的线程清点人数。尤其是清点自己是不是挂了,不太合理。暂时不管。

看join方法:

    def join(self, timeout=None):
        assert self.__initialized, "Thread.__init__() not called"
        assert self.__started, "cannot join thread before it is started"
        assert self is not currentThread(), "cannot join current thread" #自己不能join自己 -_-!!
        if __debug__:
            if not self.__stopped:
                self._note("%s.join(): waiting until thread stops", self)
        self.__block.acquire() #这个不知是要保证什么的
        try:
            if timeout is None: #没有设置超时边界的情况
                while not self.__stopped:#这个简单粗暴处理...
                    self.__block.wait()
                if __debug__:
                    self._note("%s.join(): thread stopped", self)
            else: #设置了超时
                deadline = _time() + timeout
                while not self.__stopped:
                    delay = deadline - _time()
                    if delay <= 0:
                        if __debug__:
                            self._note("%s.join(): timed out", self)
                        break
                    self.__block.wait(delay)
                else: #好像缩紧有点混乱凑合看..这个blog不支持贴代码,很麻烦
                    if __debug__:
                        self._note("%s.join(): thread stopped", self)
        finally:
            self.__block.release()

join还是很好理解的。

最后,花絮。I found:

class _Semaphore(_Verbose):

    # After Tim Peters' semaphore class, but not quite the same (no maximum)

....
看来整个类也经过了多次不同设计者的修缮 。

页: [1]

Powered by Discuz! Archiver 6.1.0  © 2001-2007 Comsenz Inc.