|
  
- UID
- 1
- 帖子
- 4763
- 精华
- 42
- 积分
- 560
- 阅读权限
- 200
- 来自
- 中国PHP联盟
- 在线时间
- 570 小时
- 注册时间
- 2007-12-10
- 最后登录
- 2010-3-22
|
stackless pyton微线程两个无限循环线程实现互相切换的问题。
我用的是stackless模块
我有两个无限循环
一个线程是无限循环检测一个值的变化
另外一个是接受udp包
设置两个通道
one=stackless.channel()
two=stackless.channel()
def check():
two.recevie()
while 1:
if 检测值 a=1:
dosomething
else:
one.send(’开始收包’)
def udp():
one.receive()
s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((loaclhost_ip,port))
while 1:
d,a=s.recvfrom(1024)
two.send(’返回check’)
开启线程
stackless.tasklet(check)()
stackless.tasklet(udp)()
stackless.tasklet(two.send)(’开始检测’)
stackless.run()
如果我收不到数据,我永远无法跳回到check()检测值a了~
我要怎么做才能一边收包一边检测a的值呢??
网上查到一个博客
http://blog.filia.cn/2009/05/03/ ... age-1/#comment-4172
有人已经写了一个
socket 非阻塞库
用这个库实现了我的需求
stacklesssocket.py
用import stacklesssocket as socket
stacklesssocket.py代码- 1. #coding:gb2312
- 2. # socketlibevent.py - MIT License
- 3. # phoenix@burninglabs.com
- 4. #
- 5. # Non-blocking socket I/O for Stackless Python using libevent, via pyevent.
- 6. #
- 7. # Usage:
- 8. # import sys, socketlibevent; sys.modules['socket'] = socketlibevent
- 9. #
- 10. # Based on Richard Tew's stacklesssocket module.
- 11. # Uses Dug Song's pyevent.
- 12. #
- 13. # Thanks a Heap !
- 14.
- 15.
- 16. import stackless, sys, time, traceback
- 17. from weakref import WeakValueDictionary
- 18. import socket as stdsocket
- 19. from socket import _fileobject
- 20.
- 21. try:
- 22. import event
- 23. except:
- 24. try:
- 25. import rel; rel.override()
- 26. import event
- 27. except:
- 28. print "please install libevent and pyevent"
- 29. # http://code.google.com/p/pyevent/
- 30. print "(or 'stackless ez_setup.py rel' for quick testing)"
- 31. # http://code.google.com/p/registeredeventlistener/
- 32. sys.exit()
- 33.
- 34. # For SSL support, this module uses the 'ssl' module (built in from 2.6 up):
- 35. # ('back-port' for Python < 2.6: http://pypi.python.org/pypi/ssl/)
- 36. try:
- 37. import ssl as ssl_
- 38. ssl_enabled = True
- 39. except:
- 40. ssl_enabled = False
- 41.
- 42. # Nice socket globals import ripped from Minor Gordon's Yield.
- 43. if "__all__" in stdsocket.__dict__:
- 44. __all__ = stdsocket.__dict__["__all__"]
- 45. globals().update((key, value) for key, value in\
- 46. stdsocket.__dict__.iteritems() if key in __all__ or key == "EBADF")
- 47. else:
- 48. other_keys = ("error", "timeout", "getaddrinfo")
- 49. globals().update((key, value) for key, value in\
- 50. stdsocket.__dict__.iteritems() if key.upper() == key or key in\
- 51. other_keys)
- 52. _GLOBAL_DEFAULT_TIMEOUT = 0.1
- 53.
- 54. # simple decorator to run a function in a tasklet
- 55. #任务编排
- 56. def tasklet(task):
- 57. def run(*args, **kwargs):
- 58. stackless.tasklet(task)(*args, **kwargs)
- 59. return run
- 60.
- 61. # Event Loop Management
- 62. #事件循环管理
- 63. loop_running = False
- 64. sockets = WeakValueDictionary()
- 65.
- 66. def die():
- 67. global sockets
- 68. sockets = {}
- 69. sys.exit()
- 70.
- 71. #修饰符,意思是 tasklet(eventLoop)
- 72. @tasklet
- 73. def eventLoop():
- 74. global loop_running
- 75. global event_errors
- 76.
- 77. while sockets.values():
- 78. # If there are other tasklets scheduled:
- 79. # use the nonblocking loop
- 80. # else: use the blocking loop
- 81. if stackless.getruncount() > 2: # main tasklet + this one
- 82. event.loop(True)
- 83. else:
- 84. event.loop(False)
- 85. stackless.schedule()
- 86. loop_running = False
- 87.
- 88. def runEventLoop():
- 89. global loop_running
- 90. if not loop_running:
- 91. event.init()
- 92. event.signal(2, die)
- 93. event.signal(3, die)
- 94. eventLoop()
- 95. loop_running = True
- 96.
- 97.
- 98. # Replacement Socket Module Functions
- 99. def socket(family=AF_INET, type=SOCK_STREAM, proto=0):
- 100. return evsocket(stdsocket.socket(family, type, proto))
- 101.
- 102. def create_connection(address, timeout=0.1):
- 103. s = socket()
- 104. s.connect(address, timeout)
- 105. return s
- 106.
- 107. def ssl(sock, keyfile=None, certfile=None):
- 108. if ssl_enabled:
- 109. return evsocketssl(sock, keyfile, certfile)
- 110. else:
- 111. raise RuntimeError(\
- 112. "SSL requires the 'ssl' module: 'http://pypi.python.org/pypi/ssl/'")
- 113.
- 114.
- 115. # Socket Proxy Class
- 116. class evsocket():
- 117. # XXX Not all socketobject methods are implemented!
- 118. # XXX Currently, the sockets are using the default, blocking mode.
- 119.
- 120. def __init__(self, sock):
- 121. self.sock = sock
- 122. self.accepting = False
- 123. self.connected = False
- 124. self.remote_addr = None
- 125. self.fileobject = None
- 126. self.read_channel = stackless.channel()
- 127. self.write_channel = stackless.channel()
- 128. self.accept_channel = None
- 129. global sockets
- 130. sockets[id(self)] = self
- 131. runEventLoop()
- 132.
- 133. def __getattr__(self, attr):
- 134. return getattr(self.sock, attr)
- 135.
- 136. def listen(self, backlog=255):
- 137. self.accepting = True
- 138. return self.sock.listen(backlog)
- 139.
- 140. def accept(self):
- 141. if not self.accept_channel:
- 142. self.accept_channel = stackless.channel()
- 143. event.event(self.handle_accept, handle=self.sock,
- 144. evtype=event.EV_READ | event.EV_PERSIST).add()
- 145. return self.accept_channel.receive()
- 146.
- 147. @tasklet
- 148. def handle_accept(self, ev, sock, event_type, *arg):
- 149. s, a = self.sock.accept()
- 150. s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
- 151. s = evsocket(s)
- 152. self.accept_channel.send((s,a))
- 153.
- 154. def connect(self, address, timeout=0.1):
- 155. endtime = time.time() + timeout
- 156. while time.time() < endtime:
- 157. if self.sock.connect_ex(address) == 0:
- 158. self.connected = True
- 159. self.remote_addr = address
- 160. return
- 161. if not self.connected:
- 162. # One last try, just to raise an error
- 163. return self.sock.connect(address)
- 164.
- 165. def send(self, data, *args):
- 166. event.write(self.sock, self.handle_send, data)
- 167. return self.write_channel.receive()
- 168.
- 169. @tasklet
- 170. def handle_send(self, data):
- 171. self.write_channel.send(self.sock.send(data))
- 172.
- 173. def sendall(self, data, *args):
- 174. while data:
- 175. try:
- 176. sent = self.send(data)
- 177. data = data[sent + 1:]
- 178. except:
- 179. raise
- 180. return None
- 181.
- 182. def recv(self, bytes, *args):
- 183. event.read(self.sock, self.handle_recv, bytes)
- 184. return self.read_channel.receive()
- 185.
- 186. @tasklet
- 187. def handle_recv(self, bytes):
- 188. self.read_channel.send(self.sock.recv(bytes))
- 189.
- 190. def recvfrom(self, bytes, *args):
- 191. event.read(self.sock, self.handle_recv, bytes)
- 192. return self.read_channel.receive()
- 193.
- 194. @tasklet
- 195. def handle_recvfrom(self, bytes):
- 196. self.read_channel.send(self.sock.recvfrom(bytes))
- 197.
- 198. def makefile(self, mode='r', bufsize=-1):
- 199. self.fileobject = stdsocket._fileobject(self, mode, bufsize)
- 200. return self.fileobject
- 201.
- 202. def close(self):
- 203. # XXX Stupid workaround
- 204. # Don't close while the fileobject is still using the fakesocket
- 205. def _close():
- 206. while self.fileobject._sock == self:
- 207. stackless.schedule()
- 208. self._sock.close()
- 209. del sockets[id(self)]
- 210. if self.fileobject:
- 211. stackless.tasklet(_close)()
- 212.
- 213.
- 214. # SSL Proxy Class
- 215. class evsocketssl(evsocket):
- 216. def __init__(self, sock, keyfile=None, certfile=None):
- 217. if certfile:
- 218. server_side = True
- 219. else:
- 220. server_side = False
- 221.
- 222. # XXX This currently performs a BLOCKING handshake operation
- 223. # TODO Implement a non-blocking handshake
- 224. self.sock = ssl_.wrap_socket(sock, keyfile, certfile, server_side)
- 225.
- 226. @tasklet
- 227. def handle_accept(self, ev, sock, event_type, *arg):
- 228. s, a = self.sock.accept()
- 229. s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
- 230. s.setsockopt(stdsocket.IPPROTO_TCP, stdsocket.TCP_NODELAY, 1)
- 231. s = evsocketssl(s)
- 232. self.accept_channel.send((s,a))
- 233.
- 234.
- 235. if __name__ == "__main__":
- 236. sys.modules["socket"] = __import__(__name__)
- 237.
- 238. # Minimal Client Test
- 239. # TODO: Add a Minimal Server Test
- 240.
- 241. import urllib2
- 242.
- 243. @tasklet
- 244. def test(i):
- 245. print "url read", i
- 246. print urllib2.urlopen("http://www.google.com").read(12)
- 247.
- 248. for i in range(5):
- 249. test(i)
- 250.
- 251. stackless.run()
复制代码 不过对于stacklesssocket.py还是看不太懂~ |
|