1. 程式人生 > >asyncio系列之Lock實現

asyncio系列之Lock實現

  1 import types
  2 import select
  3 import time
  4 import socket
  5 import functools
  6 import collections
  7 
  8 
  9 class Future:
 10     def __init__(self, *, loop=None):
 11         self._result = None
 12         self._callbacks = []
 13         self._loop = loop
 14 
 15     def set_result(self, result):
 16         self._result = result
 17         callbacks = self._callbacks[:]
 18         self._callbacks = []
 19         for callback in callbacks:
 20             self._loop._ready.append(callback)
 21 
 22     def add_callback(self, callback):
 23         self._callbacks.append(callback)
 24 
 25     def __iter__(self):
 26         print("掛起在yield處")
 27         yield self
 28         print("恢復執行")
 29         return "future"
 30 
 31     __await__ = __iter__
 32 
 33 
 34 class Task:
 35     def __init__(self, cor, *, loop=None):
 36         self.cor = cor
 37         self._loop = loop
 38 
 39     def _step(self):
 40         cor = self.cor
 41         try:
 42             result = cor.send(None)
 43         except StopIteration as e:
 44             self._loop._task_count -= 1
 45             if self._loop._task_count == 0:
 46                 self._loop.close()
 47         except Exception as e:
 48             pass
 49         else:
 50             if isinstance(result, Future):
 51                 result.add_callback(self._wakeup)
 52 
 53     def _wakeup(self):
 54         self._step()
 55 
 56 
 57 class Loop:
 58     def __init__(self):
 59         self._stop = False
 60         self._ready = []
 61         self._scheduled = []
 62         self._time = lambda: time.time()
 63         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 64         sock.setblocking(False)
 65         self._select = functools.partial(select.select, [sock], [], [])
 66         self._task_count = 0
 67 
 68     def create_task(self, cor):
 69         task = Task(cor, loop=self)
 70         self._ready.append(task._step)
 71         self._task_count += 1
 72         return task
 73 
 74     def call_later(self, delay, callback, *args):
 75         callback._when = delay
 76         self._scheduled.append((callback, *args))
 77 
 78     def run_until_complete(self, task):
 79         assert isinstance(task, Task)
 80         timeout = None
 81         while not self._stop:
 82             if self._ready:
 83                 timeout = 0
 84             if self._scheduled:
 85                 callback, *args = self._scheduled.pop()
 86                 timeout = callback._when
 87                 self._ready.append(functools.partial(callback, *args))
 88 
 89                 self._select(timeout)
 90             n = len(self._ready)
 91             for i in range(n):
 92                 step = self._ready.pop()
 93                 step()
 94 
 95     def close(self):
 96         self._stop = True
 97 
 98 
 99 @types.coroutine
100 def _sleep():
101     yield
102 
103 
104 async def sleep(s, result=None):
105     if s <= 0:
106         await _sleep()
107         return result
108     else:
109         future = Future(loop=loop)
110         future._loop.call_later(s, unless_cancelled, future)
111         await future
112         return result
113 
114 
115 def unless_cancelled(future):
116     future.set_result(None)
117 
118 
119 class Lock:
120     def __init__(self, *, loop=None):
121         self._waiters = collections.deque()
122         self._locked = False
123         self._loop = loop
124 
125     def __repr__(self):
126         extra = 'locked' if self._locked else 'unlocked'
127         if self._waiters:
128             extra = '{},waiters:{}'.format(extra, len(self._waiters))
129         return '<[{}]>'.format(extra)
130 
131     def locked(self):
132         """Return True if lock is acquired."""
133         return self._locked
134 
135     @types.coroutine
136     def acquire(self):
137         if not self._locked:
138             self._locked = True
139             return True
140 
141         fut = Future(loop=self._loop)
142         self._waiters.append(fut)
143 
144         try:
145             yield from fut
146         finally:
147             self._waiters.remove(fut)
148 
149         self._locked = True
150         return True
151 
152     def release(self):
153         if self._locked:
154             self._locked = False
155             self._wake_up_first()
156         else:
157             raise RuntimeError('Lock is not acquired.')
158 
159     def _wake_up_first(self):
160         """Wake up the first waiter if it isn't done."""
161         try:
162             fut = next(iter(self._waiters))
163         except StopIteration:
164             return
165 
166         fut.set_result(True)
167 
168 
169 async def foo(look):
170     await look.acquire()
171     print(f'enter foo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
172     await sleep(1)
173     print(f'exit foo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
174     look.release()
175 
176 
177 async def goo(look):
178     await look.acquire()
179     print(f'enter goo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
180     await sleep(1)
181     print(f'exit goo at {time.strftime("%Y-%m-%d %H:%M:%S")}')
182     look.release()
183 
184 
185 if __name__ == '__main__':
186     loop = Loop()
187     look = Lock(loop=loop)
188     f = foo(look)
189     g = goo(look)
190     task1 = loop.create_task(f)
191     task2 = loop.create_task(g)
192     loop.run_until_complete(task1)

&n