1. 程式人生 > 實用技巧 >第十一章:Python高階程式設計-協程和非同步IO

第十一章:Python高階程式設計-協程和非同步IO

第十一章:Python高階程式設計-協程和非同步IO

Python3高階核心技術97講 筆記

目錄

  • 第十一章:Python高階程式設計-協程和非同步IO
    • 11.1 併發、並行、同步、非同步、阻塞、非阻塞
    • 11.2 C10K問題和IO多路複用(select、poll、epoll)
      • 11.2.1 C10K問題
      • 11.2.2 Unix下五種I/O模型
    • 11.3 select+回撥+事件迴圈
    • 11.4 回撥之痛
    • 11.5 什麼是協程
      • 11.5.1 C10M問題
      • 11.5.2 協程
    • 11.6 生成器進階-send、close和throw方法
    • 11.7生成器進階-yield from
    • 11.8 yield from how
    • 11.9 async和await
    • 11.10 生成器實現協程

11.1 併發、並行、同步、非同步、阻塞、非阻塞

併發

併發是指一個時間段內,有幾個程式在同一個CPU上執行,但是任意時刻只有一個程式在CPU上執行。

並行

並行是指任意時刻點上,有多個程式同時執行在多個CPU上。

同步

同步是指程式碼呼叫IO操作是,必須等待IO操作完成才返回的呼叫方式。

非同步

非同步是指程式碼呼叫IO操作是,不必等IO操作完成就返回的呼叫方式。

阻塞

阻塞是指呼叫函式時候當前執行緒被掛起。

非阻塞

阻塞是指呼叫函式時候當前執行緒不會被掛起,而是立即返回。

11.2 C10K問題和IO多路複用(select、poll、epoll)

11.2.1 C10K問題

如何在一顆1GHz CPU,2G記憶體,1gbps網路環境下,讓單臺伺服器同時為一萬個客戶端提供FTP服務。

11.2.2 Unix下五種I/O模型

阻塞式IO

非阻塞IO

IO複用

資訊驅動式IO

非同步IO(POSIX的aio_系列函式

select、poll、epoll

select、poll、epoll都是IO多路複用的機制。IO多路複用就是通過一種機制,一個程序可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程式進行相應的讀寫操作。但是select、poll、epoll本質上都是同步IO,因為他們都需要在讀寫時間就緒後自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而非同步IO則無需自己負責進行讀寫,非同步IO的實現會負責把資料從核心拷貝到使用者空間。

select

select函式監視的檔案描述符分為3類,分別是writefds、readfds、和exceptfds。呼叫後select函式會阻塞,直到有描述符就緒(有資料可讀、可寫、或者有except),或者超時(timeout指定等待時間,如果立即返回設為null即可),函式返回。當select函式返回後,可以通過遍歷fdset,來找到就緒的描述符。

select目前幾乎在所有的平臺上支援,其良好跨平臺支援也是他的一個優點。select的一個缺點在於單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024,可以通過修改巨集定義甚至重新編譯核心的方式提升這一限制,但是這樣也會造成效率的降低。

poll

不同於select使用三個點陣圖來表示三個fdset的方式,pollshiyongyigepollfd的指標實現。

pollfd結構包含了要監視的event和發生的event,不再使用select“引數-值”傳遞的方式。同時,pollfd並沒有最大數量限制(但是數量過大後效能也是會下降)。和select函式一樣,poll返回後,需要倫輪詢pollfd來獲取就緒的描述符

從上面看,select和poll都需要在返回後,通過遍歷檔案描述符來獲取已經就緒的socket。事實上,同時連線的大量客戶端在一時刻可能只有很少的處於就緒狀態,因此隨著監視的描述符數量的增長,其效率也會線性下降。

epoll

epoll是在2.6核心中提出的,是之前的select和poll的增強版本。相對於select和poll來說,epoll更加靈活,沒有描述符限制。epoll使用一個檔案描述符管理多個描述符,將使用者關係的檔案描述符的事件存放到核心的一個事件表中,這樣在使用者空間和核心空間的copy只需一次。

11.3 select+回撥+事件迴圈

#1. epoll並不代表一定比select好
# 在併發高的情況下,連線活躍度不是很高, epoll比select
# 併發性不高,同時連線很活躍, select比epoll好

#通過非阻塞io實現http請求

import socket
from urllib.parse import urlparse


#使用非阻塞io完成http請求

def get_url(url):
    #通過socket請求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    #建立socket連線
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.setblocking(False)
    try:
        client.connect((host, 80)) #阻塞不會消耗cpu
    except BlockingIOError as e:
        pass

    #不停的詢問連線是否建立好, 需要while迴圈不停的去檢查狀態
    #做計算任務或者再次發起其他的連線請求

    while True:
        try:
            client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
            break
        except OSError as e:
            pass


    data = b""
    while True:
        try:
            d = client.recv(1024)
        except BlockingIOError as e:
            continue
        if d:
            data += d
        else:
            break

    data = data.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()

if __name__ == "__main__":
    get_url("http://www.baidu.com")
Copy#1. epoll並不代表一定比select好
# 在併發高的情況下,連線活躍度不是很高, epoll比select
# 併發性不高,同時連線很活躍, select比epoll好

#通過非阻塞io實現http請求
# select + 回撥 + 事件迴圈
#  併發性高
# 使用單執行緒

import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


selector = DefaultSelector()
#使用select完成http請求
urls = []
stop = False


class Fetcher:
    def connected(self, key):
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
        selector.register(self.client.fileno(), EVENT_READ, self.readable)

    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode("utf8")
            html_data = data.split("\r\n\r\n")[1]
            print(html_data)
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True

    def get_url(self, url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket連線
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)

        try:
            self.client.connect((self.host, 80))  # 阻塞不會消耗cpu
        except BlockingIOError as e:
            pass

        #註冊
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)


def loop():
    #事件迴圈,不停的請求socket的狀態並呼叫對應的回撥函式
    #1. select本身是不支援register模式
    #2. socket狀態變化以後的回撥是由程式設計師完成的
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data
            call_back(key)
    #回撥+事件迴圈+select(poll\epoll)

if __name__ == "__main__":
    fetcher = Fetcher()
    import time
    start_time = time.time()
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        urls.append(url)
        fetcher = Fetcher()
        fetcher.get_url(url)
    loop()
    print(time.time()-start_time)

# def get_url(url):
#     #通過socket請求html
#     url = urlparse(url)
#     host = url.netloc
#     path = url.path
#     if path == "":
#         path = "/"
# 
#     #建立socket連線
#     client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#     client.setblocking(False)
#     try:
#         client.connect((host, 80)) #阻塞不會消耗cpu
#     except BlockingIOError as e:
#         pass
# 
#     #不停的詢問連線是否建立好, 需要while迴圈不停的去檢查狀態
#     #做計算任務或者再次發起其他的連線請求
# 
#     while True:
#         try:
#             client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
#             break
#         except OSError as e:
#             pass
# 
# 
#     data = b""
#     while True:
#         try:
#             d = client.recv(1024)
#         except BlockingIOError as e:
#             continue
#         if d:
#             data += d
#         else:
#             break
# 
#     data = data.decode("utf8")
#     html_data = data.split("\r\n\r\n")[1]
#     print(html_data)
#     client.close()

11.4 回撥之痛

如果回撥函式執行不正常該如何?

如果回撥裡面還要巢狀回撥該怎麼辦?要巢狀很多層怎麼辦?

如果嵌套了多層,其中某個環節出錯了會造成什麼後果?

如果有個資料需要被每個回撥都處理怎麼辦?

....

  1. 可讀性差
  2. 共享狀態管理困難
  3. 異常處理困難

11.5 什麼是協程

11.5.1 C10M問題

如何利用8核心CPU,64G記憶體,在10gbps的網路上保持1000萬併發連線

11.5.2 協程

# def get_url(url):
#     #do someting 1
#     html = get_html(url) #此處暫停,切換到另一個函式去執行
#     # #parse html
#     urls = parse_url(html)
#
# def get_url(url):
#     #do someting 1
#     html = get_html(url) #此處暫停,切換到另一個函式去執行
#     # #parse html
#     urls = parse_url(html)

#傳統函式呼叫 過程 A->B->C
#我們需要一個可以暫停的函式,並且可以在適當的時候恢復該函式的繼續執行
#出現了協程 -> 有多個入口的函式, 可以暫停的函式, 可以暫停的函式(可以向暫停的地方傳入值)

11.6 生成器進階-send、close和throw方法

def gen_func():
    #1. 可以產出值, 2. 可以接收值(呼叫方傳遞進來的值)
    html = yield "http://projectsedu.com"
    print(html)
    return "bobby"

#1. throw, close



#1. 生成器不只可以產出值,還可以接收值


if __name__ == "__main__":
    gen = gen_func()
    #在呼叫send傳送非none值之前,我們必須啟動一次生成器, 方式有兩種1. gen.send(None), 2. next(gen)
    url = gen.send(None)
    #download url
    html = "bobby"
    print(gen.send(html)) #send方法可以傳遞值進入生成器內部,同時還可以重啟生成器執行到下一個yield位置
    print(gen.send(html))
    #1.啟動生成器方式有兩種, next(), send

    # print(next(gen))
    # print(next(gen))
    # print(next(gen))
    # print(next(gen))
Copydef gen_func():
    #1. 可以產出值, 2. 可以接收值(呼叫方傳遞進來的值)
    try:
        yield "http://projectsedu.com"
    except BaseException:
        pass
    yield 2
    yield 3
    return "bobby"

if __name__ == "__main__":
    gen = gen_func()
    print(next(gen))
    gen.close()
    print("bobby")

    #GeneratorExit是繼承自BaseException, Exception
Copydef gen_func():
    #1. 可以產出值, 2. 可以接收值(呼叫方傳遞進來的值)
    try:
        yield "http://projectsedu.com"
    except Exception as e:
        pass
    yield 2
    yield 3
    return "bobby"

if __name__ == "__main__":
    gen = gen_func()
    print(next(gen))
    gen.throw(Exception, "download error")
    print(next(gen))
    gen.throw(Exception, "download error")

11.7生成器進階-yield from

#python3.3新加了yield from語法
from itertools import chain

my_list = [1,2,3]
my_dict = {
    "bobby1":"http://projectsedu.com",
    "bobby2":"http://www.imooc.com",
}
#yield from iterable

# def g1(iterable):
#     yield iterable
#
# def g2(iterable):
#     yield from iterable
#
# for value in g1(range(10)):
#     print(value)
# for value in g2(range(10)):
#     print(value)


def my_chain(*args, **kwargs):
    for my_iterable in args:
        yield from my_iterable
        # for value in my_iterable:
        #     yield value

for value in my_chain(my_list, my_dict, range(5,10)):
    print(value)

def g1(gen):
    yield from gen

def main():
    g = g1()
    g.send(None)

#1. main 呼叫方 g1(委託生成器) gen 子生成器
#1. yield from會在呼叫方與子生成器之間建立一個雙向通道
Copyfinal_result = {}

# def middle(key):
#     while True:
#         final_result[key] = yield from sales_sum(key)
#         print(key+"銷量統計完成!!.")
#
# def main():
#     data_sets = {
#         "bobby牌面膜": [1200, 1500, 3000],
#         "bobby牌手機": [28,55,98,108 ],
#         "bobby牌大衣": [280,560,778,70],
#     }
#     for key, data_set in data_sets.items():
#         print("start key:", key)
#         m = middle(key)
#         m.send(None) # 預激middle協程
#         for value in data_set:
#             m.send(value)   # 給協程傳遞每一組的值  # 傳送到字生成器裡
#         m.send(None)
#     print("final_result:", final_result)
#
# if __name__ == '__main__':
#     main()

def sales_sum(pro_name):
    total = 0
    nums = []
    while True:
        x = yield
        print(pro_name+"銷量: ", x)
        if not x:
            break
        total += x
        nums.append(x)
    return total, nums

if __name__ == "__main__":
    my_gen = sales_sum("bobby牌手機")
    my_gen.send(None)
    my_gen.send(1200)
    my_gen.send(1500)
    my_gen.send(3000)
    try:
        my_gen.send(None)
    except StopIteration as e:
        result = e.value
        print(result)

11.8 yield from how

#pep380

#1. RESULT = yield from EXPR可以簡化成下面這樣
#一些說明
"""
_i:子生成器,同時也是一個迭代器
_y:子生成器生產的值
_r:yield from 表示式最終的值
_s:呼叫方通過send()傳送的值
_e:異常物件

"""

_i = iter(EXPR)      # EXPR是一個可迭代物件,_i其實是子生成器;
try:
    _y = next(_i)   # 預激子生成器,把產出的第一個值存在_y中;
except StopIteration as _e:
    _r = _e.value   # 如果丟擲了`StopIteration`異常,那麼就將異常物件的`value`屬性儲存到_r,這是最簡單的情況的返回值;
else:
    while 1:    # 嘗試執行這個迴圈,委託生成器會阻塞;
        _s = yield _y   # 生產子生成器的值,等待呼叫方`send()`值,傳送過來的值將儲存在_s中;
        try:
            _y = _i.send(_s)    # 轉發_s,並且嘗試向下執行;
        except StopIteration as _e:
            _r = _e.value       # 如果子生成器丟擲異常,那麼就獲取異常物件的`value`屬性存到_r,退出迴圈,恢復委託生成器的執行;
            break
RESULT = _r     # _r就是整個yield from表示式返回的值。

"""
1. 子生成器可能只是一個迭代器,並不是一個作為協程的生成器,所以它不支援.throw()和.close()方法;
2. 如果子生成器支援.throw()和.close()方法,但是在子生成器內部,這兩個方法都會丟擲異常;
3. 呼叫方讓子生成器自己丟擲異常
4. 當呼叫方使用next()或者.send(None)時,都要在子生成器上呼叫next()函式,當呼叫方使用.send()傳送非 None 值時,才呼叫子生成器的.send()方法;
"""
_i = iter(EXPR)
try:
    _y = next(_i)
except StopIteration as _e:
    _r = _e.value
else:
    while 1:
        try:
            _s = yield _y
        except GeneratorExit as _e:
            try:
                _m = _i.close
            except AttributeError:
                pass
            else:
                _m()
            raise _e
        except BaseException as _e:
            _x = sys.exc_info()
            try:
                _m = _i.throw
            except AttributeError:
                raise _e
            else:
                try:
                    _y = _m(*_x)
                except StopIteration as _e:
                    _r = _e.value
                    break
        else:
            try:
                if _s is None:
                    _y = next(_i)
                else:
                    _y = _i.send(_s)
            except StopIteration as _e:
                _r = _e.value
                break
RESULT = _r

"""
看完程式碼,我們總結一下關鍵點:

1. 子生成器生產的值,都是直接傳給呼叫方的;呼叫方通過.send()傳送的值都是直接傳遞給子生成器的;如果傳送的是 None,會呼叫子生成器的__next__()方法,如果不是 None,會呼叫子生成器的.send()方法;
2. 子生成器退出的時候,最後的return EXPR,會觸發一個StopIteration(EXPR)異常;
3. yield from表示式的值,是子生成器終止時,傳遞給StopIteration異常的第一個引數;
4. 如果呼叫的時候出現StopIteration異常,委託生成器會恢復執行,同時其他的異常會向上 "冒泡";
5. 傳入委託生成器的異常裡,除了GeneratorExit之外,其他的所有異常全部傳遞給子生成器的.throw()方法;如果呼叫.throw()的時候出現了StopIteration異常,那麼就恢復委託生成器的執行,其他的異常全部向上 "冒泡";
6. 如果在委託生成器上呼叫.close()或傳入GeneratorExit異常,會呼叫子生成器的.close()方法,沒有的話就不呼叫。如果在呼叫.close()的時候丟擲了異常,那麼就向上 "冒泡",否則的話委託生成器會丟擲GeneratorExit異常。

"""

11.9 async和await

#python為了將語義變得更加明確,就引入了async和await關鍵詞用於定義原生的協程
# async def downloader(url):
#     return "bobby"
import types

@types.coroutine
def downloader(url):
    yield "bobby"

async def download_url(url):
    #dosomethings
    html = await downloader(url)
    return html

if __name__ == "__main__":
    coro = download_url("http://www.imooc.com")
    # next(None)
    coro.send(None)

11.10 生成器實現協程

#生成器是可以暫停的函式
import inspect
# def gen_func():
#     value=yield from
#     #第一返回值給呼叫方, 第二呼叫方通過send方式返回值給gen
#     return "bobby"
#1. 用同步的方式編寫非同步的程式碼, 在適當的時候暫停函式並在適當的時候啟動函式
import socket
def get_socket_data():
    yield "bobby"

def downloader(url):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.setblocking(False)

    try:
        client.connect((host, 80))  # 阻塞不會消耗cpu
    except BlockingIOError as e:
        pass

    selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
    source = yield from get_socket_data()
    data = source.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)

def download_html(html):
    html = yield from downloader()

if __name__ == "__main__":
    #協程的排程依然是 事件迴圈+協程模式 ,協程是單執行緒模式
    pass