1. 程式人生 > >Python知識點-協程

Python知識點-協程

完成 線程並發 猴子補丁 無限 從服務器 開始 nal def accep

協程就是一個線程,只是說再一個線程上來回切換。

協程切換任務是靠代碼,遇到IO 操作就切換,而線程和進程是靠操作系統自動切換

1.greenlet

from greenlet import greenlet

def Producer():
    while True:
        print(我是生產者我會生產大肉包)
        time.sleep(1)
        c.switch()  # 切換到消費者
        print(生產結束)
def Consumer():
    while True:
        print(我是消費者我就會吃)
        time.sleep(
1) p.switch() # 切換到生產者 print(消費結束) # 將普通函數編程協程 c = greenlet(Consumer) p = greenlet(Producer) c.switch() #consumer先執行

2.gevent 只有協程遇到能識別的IO操作才切換(from gevent import monkey;monkey.patch_all())

# 將python標準庫中的一些阻塞操作變為非阻塞
from gevent import monkey;monkey.patch_all()
# 使用猴子補丁要寫在第一行
import
gevent def test1(): print("test1") gevent.sleep(1) # 模擬耗時操作 print("test11") def test2(): print("test2") gevent.sleep(1) # 模擬耗時操作 print("test22") g1 = gevent.spawn(test1) # 將函數封裝成協程,並啟動 g2 = gevent.spawn(test2) gevent.joinall([g1, g2])

greenlet 和gevent 區別在於一個是手動切換,一個是自動切換,gevent是在greenlet的基礎上實現的。

# 基於gevent的並發服務器實現
import gevent
# 將python內置的socket換成封裝了IO多路復用的socket
from  gevent import monkey;monkey.patch_all()
import socket

# 實例化socket
server = socket.socket()
# 綁定ip和端口
server.bind((0.0.0.0, 8000))
# 綁定監聽數量
server.listen(1000)

def worker(conn):

    while True:
        recv_data = conn.recv(1024)  # 等待接收數據
        if recv_data:
            print(recv_data)
            conn.send(recv_data))  # 將接收的數據原路返回
        else:
            conn.close()  # 發送完畢斷開
            break

while True:
    conn, addr = server.accept()  # 等待客戶端連接,遇到阻塞切換
    gevent.spawn(worker, conn)  # 生成協程,並將conn作為參數傳入

3.asyncio py3.4開始加入的內置標準庫 使用yield from 切換協程,遇到阻塞就切換,等阻塞結束後拿到返回值

import threading
import asyncio

@asyncio.coroutine
def hello():
    print(Hello world! (%s) % threading.currentThread())
    yield from asyncio.sleep(1) #模擬創建一個1秒後完成的協程
    print(Hello again! (%s) % threading.currentThread())

#獲取event_loop
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
#執行協程
loop.run_until_complete(asyncio.wait(tasks))
loop.close() #關閉事件循環

結果:

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暫停約1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

由打印的當前線程名稱可以看出,兩個coroutine是由同一個線程並發執行的。

如果把asyncio.sleep()換成真正的IO操作,則多個coroutine就可以由一個線程並發執行。

我們用asyncio的異步網絡連接來獲取sina、sohu和163的網站首頁:

import asyncio

@asyncio.coroutine
def wget(host):
    print(wget %s... % host)
    connect = asyncio.open_connection(host, 80) #連接服務器,創建一個socket,返回兩個對應的流控制對象StreamReader、StreamWriter。
    reader, writer = yield from connect 
    header = GET / HTTP/1.0\r\nHost: %s\r\n\r\n % host
    #調用寫對象write函數來發送數據給服務器,但是這裏並沒有直正把數據發送出去,只是寫到內部緩沖區,
    writer.write(header.encode(utf-8))
    #調用writer.drain()函數就是等著socket把數據發送出去
    yield from writer.drain()
    while True:
        #接收數據的無限循環,從服務器接受數據,無數據接收到,就結束循環。
        line = yield from reader.readline()
        if line == b\r\n:
            break
        print(%s header > %s % (host, line.decode(utf-8).rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop() #創建 event_loop
tasks = [wget(host) for host in [www.sina.com.cn, www.sohu.com, www.163.com]]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

4.async/await 是新寫法,await替換yield from 和 async 替換@asyncio.coroutine,其他不變

import threading
import asyncio


async def hello():
    print(Hello world! (%s) % threading.currentThread())
    await asyncio.sleep(1) #模擬創建一個1秒後完成的協程
    print(Hello again! (%s) % threading.currentThread())

#獲取event_loop
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
#執行協程
loop.run_until_complete(asyncio.wait(tasks))
loop.close() #關閉事件循環

Python知識點-協程