asyncio of Python
asyncio
https://docs.python.org/3.7/library/asyncio.html
非同步程式設計庫。
支援併發執行。
提供一些高層API,
asyncio is a library to write concurrent code using the async/await syntax.
asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
asyncio is often a perfect fit for IO-bound and high-level structured network code.
asyncio provides a set of high-level APIs to:
run Python coroutines concurrently and have full control over their execution;
perform network IO and IPC;
control subprocesses;
distribute tasks via queues;
synchronize
concurrent code;
https://pymotw.com/3/asyncio/index.html
相對 執行緒 和 程序, 協程是單程序和單執行緒,具有更少的系統開銷。
The
asyncio
module provides tools for building concurrent applications using coroutines. While thethreading
module implements concurrency through application threads andmultiprocessing
implements concurrency using system processes,asyncio
uses a single-threaded, single-process approach in which parts of an application cooperate to switch tasks explicitly at optimal times. Most often this context switching occurs when the program would otherwise block waiting to read or write data, butasyncio
also includes support for scheduling code to run at a specific future time, to enable one coroutine to wait for another to complete, for handling system signals, and for recognizing other events that may be reasons for an application to change what it is working on.
協程 -- 順序執行
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
任務 -- 併發執行
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}") asyncio.run(main())
任務併發管理API - gather
建立的併發的任務很多時候, 僅僅使用await去管理,很繁瑣了。
這時候可以使用gather介面。
- awaitable
asyncio.
gather
(*aws, loop=None, return_exceptions=False)Run awaitable objects in the aws sequence concurrently.
If any awaitable in aws is a coroutine, it is automatically scheduled as a Task.
If all awaitables are completed successfully, the result is an aggregate list of returned values. The order of result values corresponds to the order of awaitables in aws.
此介面不僅僅支援 task, 還支援協程。
Awaitables
We say that an object is an awaitable object if it can be used in an
await
expression. Many asyncio APIs are designed to accept awaitables.There are three main types of awaitable objects: coroutines, Tasks, and Futures.
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main())
流處理
Streams are high-level async/await-ready primitives to work with network connections. Streams allow sending and receiving data without using callbacks or low-level protocols and transports.
https://docs.python.org/3.7/library/asyncio-stream.html
client
import asyncio async def tcp_echo_client(message): reader, writer = await asyncio.open_connection( '127.0.0.1', 8888) print(f'Send: {message!r}') writer.write(message.encode()) data = await reader.read(100) print(f'Received: {data.decode()!r}') print('Close the connection') writer.close() asyncio.run(tcp_echo_client('Hello World!'))
server
import asyncio async def handle_echo(reader, writer): data = await reader.read(100) message = data.decode() addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") writer.write(data) await writer.drain() print("Close the connection") writer.close() async def main(): server = await asyncio.start_server( handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'Serving on {addr}') async with server: await server.serve_forever() asyncio.run(main())
協程同步
https://docs.python.org/3.7/library/asyncio-sync.html
協程同步類似執行緒同步, 但其本身不是執行緒安全的。只能用於協程模式, 即單程序和單執行緒。
asyncio synchronization primitives are designed to be similar to those of the
threading
module with two important caveats:
asyncio primitives are not thread-safe, therefore they should not be used for OS thread synchronization (use
threading
for that);methods of these synchronization primitives do not accept the timeout argument; use the
asyncio.wait_for()
function to perform operations with timeouts.asyncio has the following basic synchronization primitives:
子程序管理
https://docs.python.org/3.7/library/asyncio-subprocess.html
相比subprocess模組,具有更好的封裝性。
This section describes high-level async/await asyncio APIs to create and manage subprocesses.
asyncio.subprocess
import asyncio async def run(cmd): proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() print(f'[{cmd!r} exited with {proc.returncode}]') if stdout: print(f'[stdout]\n{stdout.decode()}') if stderr: print(f'[stderr]\n{stderr.decode()}') asyncio.run(run('ls /zzz'))
subprocess
import subprocess try: completed = subprocess.run( 'echo to stdout; echo to stderr 1>&2; exit 1', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except subprocess.CalledProcessError as err: print('ERROR:', err) else: print('returncode:', completed.returncode) print('stdout is {!r}'.format(completed.stdout)) print('stderr is {!r}'.format(completed.stderr))
import asyncio import sys async def get_date(): code = 'import datetime; print(datetime.datetime.now())' # Create the subprocess; redirect the standard output # into a pipe. proc = await asyncio.create_subprocess_exec( sys.executable, '-c', code, stdout=asyncio.subprocess.PIPE) # Read one line of output. data = await proc.stdout.readline() line = data.decode('ascii').rstrip() # Wait for the subprocess exit. await proc.wait() return line if sys.platform == "win32": asyncio.set_event_loop_policy( asyncio.WindowsProactorEventLoopPolicy()) date = asyncio.run(get_date()) print(f"Current date: {date}")
協程通訊 -- 佇列
https://docs.python.org/3.7/library/asyncio-queue.html
asyncio queues are designed to be similar to classes of the
queue
module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.
Queues can be used to distribute workload between several concurrent tasks:
import asyncio import random import time async def worker(name, queue): while True: # Get a "work item" out of the queue. sleep_for = await queue.get() # Sleep for the "sleep_for" seconds. await asyncio.sleep(sleep_for) # Notify the queue that the "work item" has been processed. queue.task_done() print(f'{name} has slept for {sleep_for:.2f} seconds') async def main(): # Create a queue that we will use to store our "workload". queue = asyncio.Queue() # Generate random timings and put them into the queue. total_sleep_time = 0 for _ in range(20): sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for queue.put_nowait(sleep_for) # Create three worker tasks to process the queue concurrently. tasks = [] for i in range(3): task = asyncio.create_task(worker(f'worker-{i}', queue)) tasks.append(task) # Wait until the queue is fully processed. started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') print(f'total expected sleep time: {total_sleep_time:.2f} seconds') asyncio.run(main())