1. 程式人生 > >Python 協程,gevent(yield阻塞,greenlet),協程實現多工(有規律的交替協作執行)

Python 協程,gevent(yield阻塞,greenlet),協程實現多工(有規律的交替協作執行)

實現多工:程序消耗的資源最大,執行緒消耗的資源次之,協程消耗的資源最少(單執行緒)。

gevent實現協程,gevent是通過阻塞程式碼(例如網路延遲等)來自動切換要執行的任務,所以在進行IO密集型程式時(例如爬蟲),使用gevent可以提高效率(有效利用網路延遲的時間去執行其他任務)。

 

GIL(全域性直譯器鎖)是C語言版本的Python直譯器中專有的,GIL的存在讓多執行緒的效率變低(哪個執行緒搶到鎖,就執行哪個執行緒)。在IO密集型程式中,多執行緒依然比單執行緒效率高(GIL通過IO阻塞自動切換多執行緒)。

解決GIL(全域性直譯器鎖)的問題的三種方法:1、不要用C語言版本的Python直譯器。2、讓子執行緒執行其他語言程式碼(例如:主執行緒執行Python程式碼,子執行緒執行C語言程式碼(C語言的動態庫))。3、多程序代替多執行緒(多程序可以利用多核CPU)。

 

demo.py(協程底層原理,yield):

import time


# 帶yield的函式並不是函式,而是一個生成器模板,返回一個生成器(可用於遍歷迭代)
def task_1():
    while True:
        time.sleep(0.1)
        yield  # 阻塞,等待next迭代解阻塞


def task_2():
    while True:
        time.sleep(0.1)
        yield


def main():
    t1 = task_1()  # 返回的t1是一個生成器。 此時並未執行task_1中的程式碼
    t2 = task_2()
    # 先讓t1執行一會,當t1中遇到yield的時候,再返回到24行,然後
    # 執行t2,當它遇到yield的時候,再次切換到t1中
    # 這樣t1/t2/t1/t2的交替執行,最終實現了多工....協程
    while True:
        next(t1)  # 執行task_1中的程式碼,遇到yield阻塞task_1。 等待下一次next啟用。
        next(t2)  # 實現task_1與task_2交替執行,實現多工...協程
    


if __name__ == "__main__":
    main()

demo.py(greenlet實現協程,封裝了yield):

from greenlet import greenlet  # 需要安裝greenlet模組  sudo pip3 install greenlet (python2.x使用pip) 
import time

def test1():
    while True:
        print("---A--")
        gr2.switch()   # 切換到gr2中的任務。
        time.sleep(0.5)

def test2():
    while True:
        print("---B--")
        gr1.switch()   # 切換到gr1中的任務。
        time.sleep(0.5)


gr1 = greenlet(test1)  # greenlet 底層封裝了yield。
gr2 = greenlet(test2)

#切換到gr1中執行
gr1.switch()

demo.py(gevent實現協程,封裝了greenlet,遇到阻塞程式碼自動切換協程任務):

import gevent  # 需要安裝gevent模組  sudo pip3 install gevent (python2.x使用pip) 
import time


def f1(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        gevent.sleep(0.5)   # 為了提高協程效率,遇到阻塞類程式碼,會自動切換協程任務。
        # time.sleep(0.5)   # 阻塞類程式碼必須使用gevent自己包裝的程式碼,原生阻塞類程式碼不會切換協程任務。 
                            # 可以使用monkey.patch_all()將所有原生阻塞類程式碼替換成gevent包裝的阻塞類程式碼。 

def f2(n):
    for i in range(n):
        print(gevent.getcurrent(), i)  # <Greenlet "Greenlet-0" at 0x7f4a09b34648: f1(5)> 0
        gevent.sleep(0.5)
        # time.sleep(0.5)

def f3(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        gevent.sleep(0.5)
        # time.sleep(0.5)


g1 = gevent.spawn(f1, 5)  # gevent其實是對greenlet的封裝。
g2 = gevent.spawn(f2, 5)  # 第一個引數f2表示協程執行的具體任務(函式),第二個引數5表示要傳給f2的引數
g3 = gevent.spawn(f3, 5)
g1.join()   # 遇到阻塞類程式碼,自動切換協程任務。
g2.join()
g3.join()

demo.py(gevent打補丁,monkey自動替換原生阻塞類程式碼。重要,常用):

import gevent  # 需要安裝gevent模組  sudo pip3 install gevent (python2.x使用pip)
import time
from gevent import monkey

# gevent打補丁
monkey.patch_all()  # 將所有原生阻塞類程式碼自動替換成gevent包裝的阻塞類程式碼。 


def f1(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)  # 會自動替換成 gevent.sleep(0.5)

def f2(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)

def f3(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)


# g1 = gevent.spawn(f1, 5)
# g2 = gevent.spawn(f2, 5)
# g3 = gevent.spawn(f3, 5)
# g1.join()
# g2.join()
# g3.join()

# 一種簡便寫法
gevent.joinall([
        gevent.spawn(f1, 5),
        gevent.spawn(f2, 5),
        gevent.spawn(f3, 5)
])

demo.py(gevent底層原理):

import socket
import time

tcp_server_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_server_tcp.bind(("", 7899))
tcp_server_tcp.listen(128)
tcp_server_tcp.setblocking(False)  # 設定套接字為非堵塞的方式。 (接收資料時如果沒有接到資料(阻塞)那麼就拋異常,否則正常接收資料。)

client_socket_list = list()  # 用於儲存與客戶端連線的套接字。

while True:

    # time.sleep(0.5)

    try:
        new_socket, new_addr = tcp_server_tcp.accept()  # 用拋異常的方式代替阻塞。
    except Exception as ret:
        print("---沒有新的客戶端到來---")
    else:
        print("---只要沒有產生異常,那麼也就意味著 來了一個新的客戶端----")
        new_socket.setblocking(False)  # 設定套接字為非堵塞的方式。 (如果需要阻塞就直接拋異常代替阻塞)
        client_socket_list.append(new_socket)
        
    for client_socket in client_socket_list:
        try:
            recv_data = client_socket.recv(1024)    # 用拋異常的方式代替阻塞。
        except Exception as ret:
            print("----這個客戶端還沒有傳送過來資料----")
        else:
            if recv_data:
                # 對方傳送過來資料
                print("----客戶端傳送過來了資料-----")
            else:
                # 對方呼叫close 導致了 recv返回
                client_socket.close()
                client_socket_list.remove(client_socket)
                print("---客戶端已經關閉----")