asyncio系列之Lock實現
阿新 • • 發佈:2020-08-25
1 import types 2 import select 3 import time 4 import socket 5 import functools 6 import collections 7 8 9 class Future: 10 def __init__(self, *, loop=None): 11 self._result = None 12 self._callbacks = [] 13 self._loop = loop 14 15 def set_result(self, result): 16 self._result = result 17 callbacks = self._callbacks[:] 18 self._callbacks = [] 19 for callback in callbacks: 20 self._loop._ready.append(callback) 21 22 def add_callback(self, callback): 23 self._callbacks.append(callback) 24 25 def __iter__(self): 26 print("掛起在yield處") 27 yield self 28 print("恢復執行") 29 return "future" 30 31 __await__ = __iter__ 32 33 34 class Task: 35 def __init__(self, cor, *, loop=None): 36 self.cor = cor 37 self._loop = loop 38 39 def _step(self): 40 cor = self.cor 41 try: 42 result = cor.send(None) 43 except StopIteration as e: 44 self._loop._task_count -= 1 45 if self._loop._task_count == 0: 46 self._loop.close() 47 except Exception as e: 48 pass 49 else: 50 if isinstance(result, Future): 51 result.add_callback(self._wakeup) 52 53 def _wakeup(self): 54 self._step() 55 56 57 class Loop: 58 def __init__(self): 59 self._stop = False 60 self._ready = [] 61 self._scheduled = [] 62 self._time = lambda: time.time() 63 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 64 sock.setblocking(False) 65 self._select = functools.partial(select.select, [sock], [], []) 66 self._task_count = 0 67 68 def create_task(self, cor): 69 task = Task(cor, loop=self) 70 self._ready.append(task._step) 71 self._task_count += 1 72 return task 73 74 def call_later(self, delay, callback, *args): 75 callback._when = delay 76 self._scheduled.append((callback, *args)) 77 78 def run_until_complete(self, task): 79 assert isinstance(task, Task) 80 timeout = None 81 while not self._stop: 82 if self._ready: 83 timeout = 0 84 if self._scheduled: 85 callback, *args = self._scheduled.pop() 86 timeout = callback._when 87 self._ready.append(functools.partial(callback, *args)) 88 89 self._select(timeout) 90 n = len(self._ready) 91 for i in range(n): 92 step = self._ready.pop() 93 step() 94 95 def close(self): 96 self._stop = True 97 98 99 @types.coroutine 100 def _sleep(): 101 yield 102 103 104 async def sleep(s, result=None): 105 if s <= 0: 106 await _sleep() 107 return result 108 else: 109 future = Future(loop=loop) 110 future._loop.call_later(s, unless_cancelled, future) 111 await future 112 return result 113 114 115 def unless_cancelled(future): 116 future.set_result(None) 117 118 119 class Lock: 120 def __init__(self, *, loop=None): 121 self._waiters = collections.deque() 122 self._locked = False 123 self._loop = loop 124 125 def __repr__(self): 126 extra = 'locked' if self._locked else 'unlocked' 127 if self._waiters: 128 extra = '{},waiters:{}'.format(extra, len(self._waiters)) 129 return '<[{}]>'.format(extra) 130 131 def locked(self): 132 """Return True if lock is acquired.""" 133 return self._locked 134 135 @types.coroutine 136 def acquire(self): 137 if not self._locked: 138 self._locked = True 139 return True 140 141 fut = Future(loop=self._loop) 142 self._waiters.append(fut) 143 144 try: 145 yield from fut 146 finally: 147 self._waiters.remove(fut) 148 149 self._locked = True 150 return True 151 152 def release(self): 153 if self._locked: 154 self._locked = False 155 self._wake_up_first() 156 else: 157 raise RuntimeError('Lock is not acquired.') 158 159 def _wake_up_first(self): 160 """Wake up the first waiter if it isn't done.""" 161 try: 162 fut = next(iter(self._waiters)) 163 except StopIteration: 164 return 165 166 fut.set_result(True) 167 168 169 async def foo(look): 170 await look.acquire() 171 print(f'enter foo at {time.strftime("%Y-%m-%d %H:%M:%S")}') 172 await sleep(1) 173 print(f'exit foo at {time.strftime("%Y-%m-%d %H:%M:%S")}') 174 look.release() 175 176 177 async def goo(look): 178 await look.acquire() 179 print(f'enter goo at {time.strftime("%Y-%m-%d %H:%M:%S")}') 180 await sleep(1) 181 print(f'exit goo at {time.strftime("%Y-%m-%d %H:%M:%S")}') 182 look.release() 183 184 185 if __name__ == '__main__': 186 loop = Loop() 187 look = Lock(loop=loop) 188 f = foo(look) 189 g = goo(look) 190 task1 = loop.create_task(f) 191 task2 = loop.create_task(g) 192 loop.run_until_complete(task1)
&n