发新话题
打印

stackless pyton微线程两个无限循环线程实现互相切换的问题。

stackless pyton微线程两个无限循环线程实现互相切换的问题。

我用的是stackless模块

我有两个无限循环

一个线程是无限循环检测一个值的变化
另外一个是接受udp包

设置两个通道
one=stackless.channel()
two=stackless.channel()

def check():
two.recevie()
while 1:
if 检测值 a=1:
dosomething
else:
one.send(’开始收包’)

def udp():

one.receive()
s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind((loaclhost_ip,port))
while 1:
d,a=s.recvfrom(1024)
two.send(’返回check’)

开启线程

stackless.tasklet(check)()
stackless.tasklet(udp)()
stackless.tasklet(two.send)(’开始检测’)
stackless.run()



如果我收不到数据,我永远无法跳回到check()检测值a了~

我要怎么做才能一边收包一边检测a的值呢??





网上查到一个博客

http://blog.filia.cn/2009/05/03/ ... age-1/#comment-4172



有人已经写了一个

socket 非阻塞库

用这个库实现了我的需求

stacklesssocket.py



用import stacklesssocket  as socket





stacklesssocket.py代码
复制内容到剪贴板
代码:
   1. #coding:gb2312  
   2. #  socketlibevent.py - MIT License  
   3. #  phoenix@burninglabs.com  
   4. #  
   5. #  Non-blocking socket I/O for Stackless Python using libevent, via pyevent.  
   6. #  
   7. #  Usage:  
   8. #      import sys, socketlibevent; sys.modules['socket'] = socketlibevent  
   9. #  
  10. #  Based on Richard Tew's stacklesssocket module.  
  11. #  Uses Dug Song's pyevent.  
  12. #  
  13. #  Thanks a Heap !  
  14.   
  15.   
  16. import stackless, sys, time, traceback  
  17. from weakref import WeakValueDictionary  
  18. import socket as stdsocket  
  19. from socket import _fileobject  
  20.   
  21. try:  
  22.     import event  
  23. except:  
  24.     try:  
  25.         import rel; rel.override()  
  26.         import event  
  27.     except:  
  28.         print "please install libevent and pyevent"  
  29.                 # http://code.google.com/p/pyevent/  
  30.         print "(or 'stackless ez_setup.py rel' for quick testing)"  
  31.                 # http://code.google.com/p/registeredeventlistener/  
  32.         sys.exit()  
  33.   
  34. # For SSL support, this module uses the 'ssl' module (built in from 2.6 up):  
  35. #               ('back-port' for Python < 2.6: http://pypi.python.org/pypi/ssl/)  
  36. try:  
  37.     import ssl as ssl_  
  38.     ssl_enabled = True  
  39. except:  
  40.     ssl_enabled = False  
  41.   
  42. #  Nice socket globals import ripped from Minor Gordon's Yield.  
  43. if "__all__" in stdsocket.__dict__:  
  44.     __all__ = stdsocket.__dict__["__all__"]  
  45.     globals().update((key, value) for key, value in\  
  46.              stdsocket.__dict__.iteritems() if key in __all__ or key == "EBADF")  
  47. else:  
  48.     other_keys = ("error", "timeout", "getaddrinfo")  
  49.     globals().update((key, value) for key, value in\  
  50.         stdsocket.__dict__.iteritems() if key.upper() == key or key in\  
  51.                                                                      other_keys)  
  52. _GLOBAL_DEFAULT_TIMEOUT = 0.1  
  53.   
  54. # simple decorator to run a function in a tasklet  
  55. #任务编排  
  56. def tasklet(task):  
  57.     def run(*args, **kwargs):  
  58.         stackless.tasklet(task)(*args, **kwargs)  
  59.     return run  
  60.   
  61. # Event Loop Management  
  62. #事件循环管理  
  63. loop_running = False  
  64. sockets = WeakValueDictionary()  
  65.   
  66. def die():  
  67.     global sockets  
  68.     sockets = {}  
  69.     sys.exit()  
  70.   
  71. #修饰符,意思是 tasklet(eventLoop)  
  72. @tasklet  
  73. def eventLoop():  
  74.     global loop_running  
  75.     global event_errors  
  76.   
  77.     while sockets.values():  
  78.         # If there are other tasklets scheduled:  
  79.         #     use the nonblocking loop  
  80.         # else: use the blocking loop  
  81.         if stackless.getruncount() > 2: # main tasklet + this one  
  82.             event.loop(True)  
  83.         else:  
  84.             event.loop(False)  
  85.         stackless.schedule()  
  86.     loop_running = False  
  87.   
  88. def runEventLoop():  
  89.     global loop_running  
  90.     if not loop_running:  
  91.         event.init()  
  92.         event.signal(2, die)  
  93.         event.signal(3, die)  
  94.         eventLoop()  
  95.         loop_running = True  
  96.   
  97.   
  98. # Replacement Socket Module Functions  
  99. def socket(family=AF_INET, type=SOCK_STREAM, proto=0):  
100.     return evsocket(stdsocket.socket(family, type, proto))  
101.   
102. def create_connection(address, timeout=0.1):  
103.     s = socket()  
104.     s.connect(address, timeout)  
105.     return s  
106.   
107. def ssl(sock, keyfile=None, certfile=None):  
108.     if ssl_enabled:  
109.         return evsocketssl(sock, keyfile, certfile)  
110.     else:  
111.         raise RuntimeError(\  
112.             "SSL requires the 'ssl' module: 'http://pypi.python.org/pypi/ssl/'")  
113.   
114.   
115. # Socket Proxy Class  
116. class evsocket():  
117.     # XXX Not all socketobject methods are implemented!  
118.     # XXX Currently, the sockets are using the default, blocking mode.  
119.   
120.     def __init__(self, sock):  
121.         self.sock = sock  
122.         self.accepting = False  
123.         self.connected = False  
124.         self.remote_addr = None  
125.         self.fileobject = None  
126.         self.read_channel = stackless.channel()  
127.         self.write_channel = stackless.channel()  
128.         self.accept_channel = None  
129.         global sockets  
130.         sockets[id(self)] = self  
131.         runEventLoop()  
132.   
133.     def __getattr__(self, attr):  
134.         return getattr(self.sock, attr)  
135.   
136.     def listen(self, backlog=255):  
137.         self.accepting = True  
138.         return self.sock.listen(backlog)  
139.   
140.     def accept(self):  
141.         if not self.accept_channel:  
142.             self.accept_channel = stackless.channel()  
143.             event.event(self.handle_accept, handle=self.sock,  
144.                         evtype=event.EV_READ | event.EV_PERSIST).add()  
145.         return self.accept_channel.receive()  
146.  
147.     @tasklet  
148.     def handle_accept(self, ev, sock, event_type, *arg):  
149.         s, a = self.sock.accept()  
150.         s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)  
151.         s = evsocket(s)  
152.         self.accept_channel.send((s,a))  
153.   
154.     def connect(self, address, timeout=0.1):  
155.         endtime = time.time() + timeout  
156.         while time.time() < endtime:  
157.             if self.sock.connect_ex(address) == 0:  
158.                 self.connected = True  
159.                 self.remote_addr = address  
160.                 return  
161.         if not self.connected:  
162.             # One last try, just to raise an error  
163.             return self.sock.connect(address)  
164.   
165.     def send(self, data, *args):  
166.         event.write(self.sock, self.handle_send, data)  
167.         return self.write_channel.receive()  
168.  
169.     @tasklet  
170.     def handle_send(self, data):  
171.         self.write_channel.send(self.sock.send(data))  
172.   
173.     def sendall(self, data, *args):  
174.         while data:  
175.             try:  
176.                 sent = self.send(data)  
177.                 data = data[sent + 1:]  
178.             except:  
179.                 raise  
180.         return None  
181.   
182.     def recv(self, bytes, *args):  
183.         event.read(self.sock, self.handle_recv, bytes)  
184.         return self.read_channel.receive()  
185.  
186.     @tasklet  
187.     def handle_recv(self, bytes):  
188.         self.read_channel.send(self.sock.recv(bytes))  
189.   
190.     def recvfrom(self, bytes, *args):  
191.         event.read(self.sock, self.handle_recv, bytes)  
192.         return self.read_channel.receive()  
193.  
194.     @tasklet  
195.     def handle_recvfrom(self, bytes):  
196.         self.read_channel.send(self.sock.recvfrom(bytes))  
197.   
198.     def makefile(self, mode='r', bufsize=-1):  
199.         self.fileobject = stdsocket._fileobject(self, mode, bufsize)  
200.         return self.fileobject  
201.   
202.     def close(self):  
203.         # XXX Stupid workaround  
204.         # Don't close while the fileobject is still using the fakesocket  
205.         def _close():  
206.             while self.fileobject._sock == self:  
207.                 stackless.schedule()  
208.             self._sock.close()  
209.             del sockets[id(self)]  
210.         if self.fileobject:  
211.             stackless.tasklet(_close)()  
212.   
213.   
214. # SSL Proxy Class  
215. class evsocketssl(evsocket):  
216.     def __init__(self, sock, keyfile=None, certfile=None):  
217.         if certfile:  
218.             server_side = True  
219.         else:  
220.             server_side = False  
221.   
222.         # XXX This currently performs a BLOCKING handshake operation  
223.         # TODO Implement a non-blocking handshake  
224.         self.sock = ssl_.wrap_socket(sock, keyfile, certfile, server_side)  
225.  
226.     @tasklet  
227.     def handle_accept(self, ev, sock, event_type, *arg):  
228.         s, a = self.sock.accept()  
229.         s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)  
230.         s.setsockopt(stdsocket.IPPROTO_TCP, stdsocket.TCP_NODELAY, 1)  
231.         s = evsocketssl(s)  
232.         self.accept_channel.send((s,a))  
233.   
234.   
235. if __name__ == "__main__":  
236.     sys.modules["socket"] = __import__(__name__)  
237.   
238.     # Minimal Client Test  
239.     # TODO: Add a Minimal Server Test  
240.   
241.     import urllib2  
242.  
243.     @tasklet  
244.     def test(i):  
245.         print "url read", i  
246.         print urllib2.urlopen("http://www.google.com").read(12)  
247.   
248.     for i in range(5):  
249.         test(i)  
250.   
251.     stackless.run()  
不过对于stacklesssocket.py还是看不太懂~
让中国Python发展的更快 ,更好.

TOP

发新话题
最近访问的版块