[Python] 協程學習過程
開始
之前一直在做那個rProxy
的專案,後來發現,服務端不用協程或者非同步程式設計這樣的手段是不行的,最主要的問題就是對於每個http請求都對應一個執行緒,這個開銷非常大。對於一個網頁而言,四五十個http請求已經是非常常見的事情了,如果有很多個客戶端,一下子執行緒數可能得有幾百個。而Python的多執行緒眾所周知的虛假。
所以我就會考慮使用非同步套接字來做。
在非同步套接字中會需要一個主迴圈用來管理套接字,手動管理比較麻煩,後來瞭解到了協程。發現這個就是非常適合做這個事情的啊。
而Python的庫就是asyncio
官方文件在這裡
不過後面又發現了一個更神奇的東西,就是gevent
這個庫,專案在這裡
<GitHub - gevent/gevent: Coroutine-based concurrency library for Python>
pip不能安裝的話whl包
在這裡Python Extension Packages for Windows - Christoph Gohlke (uci.edu)
協程
執行緒的話,已經很熟悉了。對於需要大量IO操作的程式,基本上首選多執行緒。但是執行緒也有一些缺點,例如說需要考慮執行緒安全問題,執行緒間切換也會損耗效能。
非常常見的現象就是,使用Python多執行緒寫爬蟲我的電腦大約80執行緒就已經是極限了,再增加執行緒也不會有什麼速度上的加快了。
此時,協程就出現了
一個執行緒可以包含很多的協程,而協程之間的切換不需要經過系統,不需要進行核心態的切換。還有各種複雜的儲存現場。
學習過程
這裡就記錄一下對協程的學習過程,主要是兩種。
一個是以asyncio
為主的協程
另一個是以第三方庫gevent
為主的協程
## asyncio
asyncio
是最先了解到的技術,可以用於實現協程。一個簡單的Demo如下。
import asyncio as ayc i = 0 async def p(c): global i while True: print(f"t{c}: {i}") i+=1 await ayc.sleep(0) async def fun(): t1 = ayc.create_task(p(1)) t2 = ayc.create_task(p(2)) await t1 await t2 def main(): ayc.run(fun()) if __name__ == "__main__": main()
這個例子只是對一個公共變數進行疊加,結果
執行之後能夠看到數字遞增,而且非常有序,t1,t2兩個協程分工交替出現
而多執行緒不加鎖的情況下,很容易出現以下的情況。也就是出現t1 t2時間分配不均勻的情況,並且執行緒不安全 89914
出現在了89916
後面
多執行緒程式碼如下
import threading
i = 0
def fun(c):
global i
while True:
print(f"t{c}: {i}")
i+=1
def main():
threading.Thread(target=fun,args=(1,)).start()
threading.Thread(target=fun,args=(2,)).start()
if __name__ == "__main__":
main()
目前到這裡就沒有問題,但是等我實際寫程式碼的時候,發現一個問題。
也就是,如果一個IO操作阻塞,則整個執行緒卡死,導致協程不能正常切換。例如下面的程式碼
import requests
import asyncio
import time
urls = ["https://www.baidu.com","https://www.cnblogs.com/","https://blog.csdn.net/"]
async def getHtml(url):
stime = time.time()
r = requests.get(url)
print(f"{url}, {r.status_code}, {time.time() - stime}")
async def main():
tasks = []
for url in urls:
tasks.append(asyncio.create_task(getHtml(url)))
stime = time.time()
await asyncio.wait(tasks)
print(f"Done, {time.time() - stime}")
if __name__ == "__main__":
asyncio.run(main())
程式碼作用是分別請求三個域名,記錄響應時間以及總時間
結果如下,注意,此時雖然用了協程,但是總時間卻是三次請求時間之和。
原因就是,直接使用asyncio
實現協程並不會監聽IO阻塞情況,也就是在requests.get()
的時候,協程沒有切換。導致整個執行緒阻塞。
所以實際的執行流依舊是序列執行,那麼協程就毫無意義。
aiohttp
而這個問題存在的原因就在於,requests
這個庫是同步庫
,底層是同步socket
。對應的,另一個名為 aiohttp
的第三方庫是基於asyncio
開發的http庫可以解決這個問題。
aiohttp
官方文件如下
<Welcome to AIOHTTP — aiohttp 3.7.3 documentation>
一個Demo如下
import aiohttp
import asyncio
import time
urls = ["https://www.baidu.com","https://www.cnblogs.com/","https://blog.csdn.net/"]
async def getHtml(url):
print(f"請求: {url}")
stime = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print(f"status: {response.status}, {time.time() - stime}")
# html = await response.text()
# print(f"Body: {html[:20]}")
async def main():
tasks = []
for url in urls:
tasks.append(asyncio.create_task(getHtml(url)))
await asyncio.wait(tasks)
print("Done")
asyncio.run(main())
最後結果如下
可以看到總花費時間近似於最長請求時間。也就是完成了非同步請求
那麼,到這裡似乎就已經圓滿了。但是,我還了解到了gevent
這個庫
gevent
直接貼程式碼
from gevent import monkey; monkey.patch_all()
import gevent
import requests
import asyncio
import time
urls = ["https://www.baidu.com","https://www.cnblogs.com/","https://blog.csdn.net/"]
def getHtml(url):
stime = time.time()
r = requests.get(url)
print(f"{url}, {r.status_code}, {time.time() - stime}")
def main():
tasks = []
for url in urls:
tasks.append(gevent.spawn(getHtml,url))
stime = time.time()
gevent.joinall(tasks)
print(f"Done, {time.time() - stime}")
if __name__ == "__main__":
main()
結果如下
看到上面的結果和使用aiohttp
庫一樣,並且依舊是用了requests
這個庫,而前面說過,requests是同步庫。。。 是不是很神奇!是的,給我驚訝到了。
然後原理也給瞭解了一下, <關於 python gevent 架框 作為 TCP伺服器 的 程式碼問題 , 每個 socket 的 訊息 接收 是否有使用 事件監聽回撥的方法呢? - 知乎 (zhihu.com)>
總的來說,就是替換了Python自己的socket實現,把socket設定成了非同步。絕了~