返回列表 发帖

stackless pyton微线程两个无限循环线程实现互相切换的问题。

我用的是stackless模块

我有两个无限循环

一个线程是无限循环检测一个值的变化
另外一个是接受udp包

设置两个通道
one=stackless.channel()
two=stackless.channel()

def check():
two.recevie()
while 1:
if 检测值 a=1:
dosomething
else:
one.send(’开始收包’)

def udp():

one.receive()
s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((loaclhost_ip,port))
while 1:
d,a=s.recvfrom(1024)
two.send(’返回check’)

开启线程

stackless.tasklet(check)()
stackless.tasklet(udp)()
stackless.tasklet(two.send)(’开始检测’)
stackless.run()



如果我收不到数据,我永远无法跳回到check()检测值a了~

我要怎么做才能一边收包一边检测a的值呢??





网上查到一个博客

http://blog.filia.cn/2009/05/03/ ... age-1/#comment-4172



有人已经写了一个

socket 非阻塞库

用这个库实现了我的需求

stacklesssocket.py



用import stacklesssocket  as socket





stacklesssocket.py代码
  1.    1. #coding:gb2312  
  2.    2. #  socketlibevent.py - MIT License  
  3.    3. #  phoenix@burninglabs.com  
  4.    4. #  
  5.    5. #  Non-blocking socket I/O for Stackless Python using libevent, via pyevent.  
  6.    6. #  
  7.    7. #  Usage:  
  8.    8. #      import sys, socketlibevent; sys.modules['socket'] = socketlibevent  
  9.    9. #  
  10.   10. #  Based on Richard Tew's stacklesssocket module.  
  11.   11. #  Uses Dug Song's pyevent.  
  12.   12. #  
  13.   13. #  Thanks a Heap !  
  14.   14.   
  15.   15.   
  16.   16. import stackless, sys, time, traceback  
  17.   17. from weakref import WeakValueDictionary  
  18.   18. import socket as stdsocket  
  19.   19. from socket import _fileobject  
  20.   20.   
  21.   21. try:  
  22.   22.     import event  
  23.   23. except:  
  24.   24.     try:  
  25.   25.         import rel; rel.override()  
  26.   26.         import event  
  27.   27.     except:  
  28.   28.         print "please install libevent and pyevent"  
  29.   29.                 # http://code.google.com/p/pyevent/  
  30.   30.         print "(or 'stackless ez_setup.py rel' for quick testing)"  
  31.   31.                 # http://code.google.com/p/registeredeventlistener/  
  32.   32.         sys.exit()  
  33.   33.   
  34.   34. # For SSL support, this module uses the 'ssl' module (built in from 2.6 up):  
  35.   35. #               ('back-port' for Python < 2.6: http://pypi.python.org/pypi/ssl/)  
  36.   36. try:  
  37.   37.     import ssl as ssl_  
  38.   38.     ssl_enabled = True  
  39.   39. except:  
  40.   40.     ssl_enabled = False  
  41.   41.   
  42.   42. #  Nice socket globals import ripped from Minor Gordon's Yield.  
  43.   43. if "__all__" in stdsocket.__dict__:  
  44.   44.     __all__ = stdsocket.__dict__["__all__"]  
  45.   45.     globals().update((key, value) for key, value in\  
  46.   46.              stdsocket.__dict__.iteritems() if key in __all__ or key == "EBADF")  
  47.   47. else:  
  48.   48.     other_keys = ("error", "timeout", "getaddrinfo")  
  49.   49.     globals().update((key, value) for key, value in\  
  50.   50.         stdsocket.__dict__.iteritems() if key.upper() == key or key in\  
  51.   51.                                                                      other_keys)  
  52.   52. _GLOBAL_DEFAULT_TIMEOUT = 0.1  
  53.   53.   
  54.   54. # simple decorator to run a function in a tasklet  
  55.   55. #任务编排  
  56.   56. def tasklet(task):  
  57.   57.     def run(*args, **kwargs):  
  58.   58.         stackless.tasklet(task)(*args, **kwargs)  
  59.   59.     return run  
  60.   60.   
  61.   61. # Event Loop Management  
  62.   62. #事件循环管理  
  63.   63. loop_running = False  
  64.   64. sockets = WeakValueDictionary()  
  65.   65.   
  66.   66. def die():  
  67.   67.     global sockets  
  68.   68.     sockets = {}  
  69.   69.     sys.exit()  
  70.   70.   
  71.   71. #修饰符,意思是 tasklet(eventLoop)  
  72.   72. @tasklet  
  73.   73. def eventLoop():  
  74.   74.     global loop_running  
  75.   75.     global event_errors  
  76.   76.   
  77.   77.     while sockets.values():  
  78.   78.         # If there are other tasklets scheduled:  
  79.   79.         #     use the nonblocking loop  
  80.   80.         # else: use the blocking loop  
  81.   81.         if stackless.getruncount() > 2: # main tasklet + this one  
  82.   82.             event.loop(True)  
  83.   83.         else:  
  84.   84.             event.loop(False)  
  85.   85.         stackless.schedule()  
  86.   86.     loop_running = False  
  87.   87.   
  88.   88. def runEventLoop():  
  89.   89.     global loop_running  
  90.   90.     if not loop_running:  
  91.   91.         event.init()  
  92.   92.         event.signal(2, die)  
  93.   93.         event.signal(3, die)  
  94.   94.         eventLoop()  
  95.   95.         loop_running = True  
  96.   96.   
  97.   97.   
  98.   98. # Replacement Socket Module Functions  
  99.   99. def socket(family=AF_INET, type=SOCK_STREAM, proto=0):  
  100. 100.     return evsocket(stdsocket.socket(family, type, proto))  
  101. 101.   
  102. 102. def create_connection(address, timeout=0.1):  
  103. 103.     s = socket()  
  104. 104.     s.connect(address, timeout)  
  105. 105.     return s  
  106. 106.   
  107. 107. def ssl(sock, keyfile=None, certfile=None):  
  108. 108.     if ssl_enabled:  
  109. 109.         return evsocketssl(sock, keyfile, certfile)  
  110. 110.     else:  
  111. 111.         raise RuntimeError(\  
  112. 112.             "SSL requires the 'ssl' module: 'http://pypi.python.org/pypi/ssl/'")  
  113. 113.   
  114. 114.   
  115. 115. # Socket Proxy Class  
  116. 116. class evsocket():  
  117. 117.     # XXX Not all socketobject methods are implemented!  
  118. 118.     # XXX Currently, the sockets are using the default, blocking mode.  
  119. 119.   
  120. 120.     def __init__(self, sock):  
  121. 121.         self.sock = sock  
  122. 122.         self.accepting = False  
  123. 123.         self.connected = False  
  124. 124.         self.remote_addr = None  
  125. 125.         self.fileobject = None  
  126. 126.         self.read_channel = stackless.channel()  
  127. 127.         self.write_channel = stackless.channel()  
  128. 128.         self.accept_channel = None  
  129. 129.         global sockets  
  130. 130.         sockets[id(self)] = self  
  131. 131.         runEventLoop()  
  132. 132.   
  133. 133.     def __getattr__(self, attr):  
  134. 134.         return getattr(self.sock, attr)  
  135. 135.   
  136. 136.     def listen(self, backlog=255):  
  137. 137.         self.accepting = True  
  138. 138.         return self.sock.listen(backlog)  
  139. 139.   
  140. 140.     def accept(self):  
  141. 141.         if not self.accept_channel:  
  142. 142.             self.accept_channel = stackless.channel()  
  143. 143.             event.event(self.handle_accept, handle=self.sock,  
  144. 144.                         evtype=event.EV_READ | event.EV_PERSIST).add()  
  145. 145.         return self.accept_channel.receive()  
  146. 146.  
  147. 147.     @tasklet  
  148. 148.     def handle_accept(self, ev, sock, event_type, *arg):  
  149. 149.         s, a = self.sock.accept()  
  150. 150.         s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)  
  151. 151.         s = evsocket(s)  
  152. 152.         self.accept_channel.send((s,a))  
  153. 153.   
  154. 154.     def connect(self, address, timeout=0.1):  
  155. 155.         endtime = time.time() + timeout  
  156. 156.         while time.time() < endtime:  
  157. 157.             if self.sock.connect_ex(address) == 0:  
  158. 158.                 self.connected = True  
  159. 159.                 self.remote_addr = address  
  160. 160.                 return  
  161. 161.         if not self.connected:  
  162. 162.             # One last try, just to raise an error  
  163. 163.             return self.sock.connect(address)  
  164. 164.   
  165. 165.     def send(self, data, *args):  
  166. 166.         event.write(self.sock, self.handle_send, data)  
  167. 167.         return self.write_channel.receive()  
  168. 168.  
  169. 169.     @tasklet  
  170. 170.     def handle_send(self, data):  
  171. 171.         self.write_channel.send(self.sock.send(data))  
  172. 172.   
  173. 173.     def sendall(self, data, *args):  
  174. 174.         while data:  
  175. 175.             try:  
  176. 176.                 sent = self.send(data)  
  177. 177.                 data = data[sent + 1:]  
  178. 178.             except:  
  179. 179.                 raise  
  180. 180.         return None  
  181. 181.   
  182. 182.     def recv(self, bytes, *args):  
  183. 183.         event.read(self.sock, self.handle_recv, bytes)  
  184. 184.         return self.read_channel.receive()  
  185. 185.  
  186. 186.     @tasklet  
  187. 187.     def handle_recv(self, bytes):  
  188. 188.         self.read_channel.send(self.sock.recv(bytes))  
  189. 189.   
  190. 190.     def recvfrom(self, bytes, *args):  
  191. 191.         event.read(self.sock, self.handle_recv, bytes)  
  192. 192.         return self.read_channel.receive()  
  193. 193.  
  194. 194.     @tasklet  
  195. 195.     def handle_recvfrom(self, bytes):  
  196. 196.         self.read_channel.send(self.sock.recvfrom(bytes))  
  197. 197.   
  198. 198.     def makefile(self, mode='r', bufsize=-1):  
  199. 199.         self.fileobject = stdsocket._fileobject(self, mode, bufsize)  
  200. 200.         return self.fileobject  
  201. 201.   
  202. 202.     def close(self):  
  203. 203.         # XXX Stupid workaround  
  204. 204.         # Don't close while the fileobject is still using the fakesocket  
  205. 205.         def _close():  
  206. 206.             while self.fileobject._sock == self:  
  207. 207.                 stackless.schedule()  
  208. 208.             self._sock.close()  
  209. 209.             del sockets[id(self)]  
  210. 210.         if self.fileobject:  
  211. 211.             stackless.tasklet(_close)()  
  212. 212.   
  213. 213.   
  214. 214. # SSL Proxy Class  
  215. 215. class evsocketssl(evsocket):  
  216. 216.     def __init__(self, sock, keyfile=None, certfile=None):  
  217. 217.         if certfile:  
  218. 218.             server_side = True  
  219. 219.         else:  
  220. 220.             server_side = False  
  221. 221.   
  222. 222.         # XXX This currently performs a BLOCKING handshake operation  
  223. 223.         # TODO Implement a non-blocking handshake  
  224. 224.         self.sock = ssl_.wrap_socket(sock, keyfile, certfile, server_side)  
  225. 225.  
  226. 226.     @tasklet  
  227. 227.     def handle_accept(self, ev, sock, event_type, *arg):  
  228. 228.         s, a = self.sock.accept()  
  229. 229.         s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)  
  230. 230.         s.setsockopt(stdsocket.IPPROTO_TCP, stdsocket.TCP_NODELAY, 1)  
  231. 231.         s = evsocketssl(s)  
  232. 232.         self.accept_channel.send((s,a))  
  233. 233.   
  234. 234.   
  235. 235. if __name__ == "__main__":  
  236. 236.     sys.modules["socket"] = __import__(__name__)  
  237. 237.   
  238. 238.     # Minimal Client Test  
  239. 239.     # TODO: Add a Minimal Server Test  
  240. 240.   
  241. 241.     import urllib2  
  242. 242.  
  243. 243.     @tasklet  
  244. 244.     def test(i):  
  245. 245.         print "url read", i  
  246. 246.         print urllib2.urlopen("http://www.google.com").read(12)  
  247. 247.   
  248. 248.     for i in range(5):  
  249. 249.         test(i)  
  250. 250.   
  251. 251.     stackless.run()  
复制代码
不过对于stacklesssocket.py还是看不太懂~
让中国Python发展的更快 ,更好.

返回列表