我用的是stackless模块
我有两个无限循环
一个线程是无限循环检测一个值的变化
另外一个是接受udp包
设置两个通道
one=stackless.channel()
two=stackless.channel()
def check():
two.recevie()
while 1:
if 检测值 a=1:
dosomething
else:
one.send(’开始收包’)
def udp():
one.receive()
s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((loaclhost_ip,port))
while 1:
d,a=s.recvfrom(1024)
two.send(’返回check’)
开启线程
stackless.tasklet(check)()
stackless.tasklet(udp)()
stackless.tasklet(two.send)(’开始检测’)
stackless.run()
如果我收不到数据,我永远无法跳回到check()检测值a了~
我要怎么做才能一边收包一边检测a的值呢??
网上查到一个博客
http://blog.filia.cn/2009/05/03/ ... age-1/#comment-4172
有人已经写了一个
socket 非阻塞库
用这个库实现了我的需求
stacklesssocket.py
用import stacklesssocket as socket
stacklesssocket.py代码
复制内容到剪贴板
代码:
1. #coding:gb2312
2. # socketlibevent.py - MIT License
3. # phoenix@burninglabs.com
4. #
5. # Non-blocking socket I/O for Stackless Python using libevent, via pyevent.
6. #
7. # Usage:
8. # import sys, socketlibevent; sys.modules['socket'] = socketlibevent
9. #
10. # Based on Richard Tew's stacklesssocket module.
11. # Uses Dug Song's pyevent.
12. #
13. # Thanks a Heap !
14.
15.
16. import stackless, sys, time, traceback
17. from weakref import WeakValueDictionary
18. import socket as stdsocket
19. from socket import _fileobject
20.
21. try:
22. import event
23. except:
24. try:
25. import rel; rel.override()
26. import event
27. except:
28. print "please install libevent and pyevent"
29. # http://code.google.com/p/pyevent/
30. print "(or 'stackless ez_setup.py rel' for quick testing)"
31. # http://code.google.com/p/registeredeventlistener/
32. sys.exit()
33.
34. # For SSL support, this module uses the 'ssl' module (built in from 2.6 up):
35. # ('back-port' for Python < 2.6: http://pypi.python.org/pypi/ssl/)
36. try:
37. import ssl as ssl_
38. ssl_enabled = True
39. except:
40. ssl_enabled = False
41.
42. # Nice socket globals import ripped from Minor Gordon's Yield.
43. if "__all__" in stdsocket.__dict__:
44. __all__ = stdsocket.__dict__["__all__"]
45. globals().update((key, value) for key, value in\
46. stdsocket.__dict__.iteritems() if key in __all__ or key == "EBADF")
47. else:
48. other_keys = ("error", "timeout", "getaddrinfo")
49. globals().update((key, value) for key, value in\
50. stdsocket.__dict__.iteritems() if key.upper() == key or key in\
51. other_keys)
52. _GLOBAL_DEFAULT_TIMEOUT = 0.1
53.
54. # simple decorator to run a function in a tasklet
55. #任务编排
56. def tasklet(task):
57. def run(*args, **kwargs):
58. stackless.tasklet(task)(*args, **kwargs)
59. return run
60.
61. # Event Loop Management
62. #事件循环管理
63. loop_running = False
64. sockets = WeakValueDictionary()
65.
66. def die():
67. global sockets
68. sockets = {}
69. sys.exit()
70.
71. #修饰符,意思是 tasklet(eventLoop)
72. @tasklet
73. def eventLoop():
74. global loop_running
75. global event_errors
76.
77. while sockets.values():
78. # If there are other tasklets scheduled:
79. # use the nonblocking loop
80. # else: use the blocking loop
81. if stackless.getruncount() > 2: # main tasklet + this one
82. event.loop(True)
83. else:
84. event.loop(False)
85. stackless.schedule()
86. loop_running = False
87.
88. def runEventLoop():
89. global loop_running
90. if not loop_running:
91. event.init()
92. event.signal(2, die)
93. event.signal(3, die)
94. eventLoop()
95. loop_running = True
96.
97.
98. # Replacement Socket Module Functions
99. def socket(family=AF_INET, type=SOCK_STREAM, proto=0):
100. return evsocket(stdsocket.socket(family, type, proto))
101.
102. def create_connection(address, timeout=0.1):
103. s = socket()
104. s.connect(address, timeout)
105. return s
106.
107. def ssl(sock, keyfile=None, certfile=None):
108. if ssl_enabled:
109. return evsocketssl(sock, keyfile, certfile)
110. else:
111. raise RuntimeError(\
112. "SSL requires the 'ssl' module: 'http://pypi.python.org/pypi/ssl/'")
113.
114.
115. # Socket Proxy Class
116. class evsocket():
117. # XXX Not all socketobject methods are implemented!
118. # XXX Currently, the sockets are using the default, blocking mode.
119.
120. def __init__(self, sock):
121. self.sock = sock
122. self.accepting = False
123. self.connected = False
124. self.remote_addr = None
125. self.fileobject = None
126. self.read_channel = stackless.channel()
127. self.write_channel = stackless.channel()
128. self.accept_channel = None
129. global sockets
130. sockets[id(self)] = self
131. runEventLoop()
132.
133. def __getattr__(self, attr):
134. return getattr(self.sock, attr)
135.
136. def listen(self, backlog=255):
137. self.accepting = True
138. return self.sock.listen(backlog)
139.
140. def accept(self):
141. if not self.accept_channel:
142. self.accept_channel = stackless.channel()
143. event.event(self.handle_accept, handle=self.sock,
144. evtype=event.EV_READ | event.EV_PERSIST).add()
145. return self.accept_channel.receive()
146.
147. @tasklet
148. def handle_accept(self, ev, sock, event_type, *arg):
149. s, a = self.sock.accept()
150. s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
151. s = evsocket(s)
152. self.accept_channel.send((s,a))
153.
154. def connect(self, address, timeout=0.1):
155. endtime = time.time() + timeout
156. while time.time() < endtime:
157. if self.sock.connect_ex(address) == 0:
158. self.connected = True
159. self.remote_addr = address
160. return
161. if not self.connected:
162. # One last try, just to raise an error
163. return self.sock.connect(address)
164.
165. def send(self, data, *args):
166. event.write(self.sock, self.handle_send, data)
167. return self.write_channel.receive()
168.
169. @tasklet
170. def handle_send(self, data):
171. self.write_channel.send(self.sock.send(data))
172.
173. def sendall(self, data, *args):
174. while data:
175. try:
176. sent = self.send(data)
177. data = data[sent + 1:]
178. except:
179. raise
180. return None
181.
182. def recv(self, bytes, *args):
183. event.read(self.sock, self.handle_recv, bytes)
184. return self.read_channel.receive()
185.
186. @tasklet
187. def handle_recv(self, bytes):
188. self.read_channel.send(self.sock.recv(bytes))
189.
190. def recvfrom(self, bytes, *args):
191. event.read(self.sock, self.handle_recv, bytes)
192. return self.read_channel.receive()
193.
194. @tasklet
195. def handle_recvfrom(self, bytes):
196. self.read_channel.send(self.sock.recvfrom(bytes))
197.
198. def makefile(self, mode='r', bufsize=-1):
199. self.fileobject = stdsocket._fileobject(self, mode, bufsize)
200. return self.fileobject
201.
202. def close(self):
203. # XXX Stupid workaround
204. # Don't close while the fileobject is still using the fakesocket
205. def _close():
206. while self.fileobject._sock == self:
207. stackless.schedule()
208. self._sock.close()
209. del sockets[id(self)]
210. if self.fileobject:
211. stackless.tasklet(_close)()
212.
213.
214. # SSL Proxy Class
215. class evsocketssl(evsocket):
216. def __init__(self, sock, keyfile=None, certfile=None):
217. if certfile:
218. server_side = True
219. else:
220. server_side = False
221.
222. # XXX This currently performs a BLOCKING handshake operation
223. # TODO Implement a non-blocking handshake
224. self.sock = ssl_.wrap_socket(sock, keyfile, certfile, server_side)
225.
226. @tasklet
227. def handle_accept(self, ev, sock, event_type, *arg):
228. s, a = self.sock.accept()
229. s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
230. s.setsockopt(stdsocket.IPPROTO_TCP, stdsocket.TCP_NODELAY, 1)
231. s = evsocketssl(s)
232. self.accept_channel.send((s,a))
233.
234.
235. if __name__ == "__main__":
236. sys.modules["socket"] = __import__(__name__)
237.
238. # Minimal Client Test
239. # TODO: Add a Minimal Server Test
240.
241. import urllib2
242.
243. @tasklet
244. def test(i):
245. print "url read", i
246. print urllib2.urlopen("http://www.google.com").read(12)
247.
248. for i in range(5):
249. test(i)
250.
251. stackless.run() 不过对于stacklesssocket.py还是看不太懂~