1. 程式人生 > 實用技巧 >協程巢狀協程

協程巢狀協程

import asyncio
import functools

# 第三層協程
async def test1():
    print('我是test1')
    await asyncio.sleep(1)
    print('test1已經睡了1秒')
    await asyncio.sleep(3)
    print('test1又睡了3秒')
    return 'xiong'

# 第三層協程
async def test2():
    print('我是test2')
    await asyncio.sleep(2)
    print('test2已經睡了2秒
') await asyncio.sleep(2) print('test2又睡了2秒') return 'fan' # 第三層協程 async def test3(): print('我是test3') await asyncio.sleep(3) print('test3已經睡了3秒') await asyncio.sleep(1) print('test3又睡了1秒') return 'yong' # 第三層協程 async def test4(): print('我是test4') await asyncio.sleep(
4) print('test4已經睡了4秒') await asyncio.sleep(1) print('test4又睡了1秒') return 'fan' # 協程回撥函式 def callback_test1(name, future): print(future.result()[0]+name) # 第二層協程 async def main(): tasks = [ asyncio.ensure_future(test1()), asyncio.ensure_future(test2()), asyncio.ensure_future(test3()), ] res
= await asyncio.gather(*tasks) return res # 第二層協程 async def main1(): res = await test4() return res # 最外層協程(主入口) async def z_main(): tasks = [ asyncio.ensure_future(main()), asyncio.ensure_future(main1()) ] # 用了偏函式傳其他引數 tasks[0].add_done_callback(functools.partial(callback_test1, 'fanyong')) res1 = await asyncio.gather(*tasks) return res1 if __name__ == '__main__': # tasks = [ # asyncio.ensure_future(test1()), # asyncio.ensure_future(test2()), # asyncio.ensure_future(test3()), # ] # tasks[0].add_done_callback(functools.partial(callback_test1, 'fanyong')) loop = asyncio.get_event_loop() res2 = loop.run_until_complete(z_main()) print(res2)