1. 程式人生 > 實用技巧 >爬蟲10-1(協程)

爬蟲10-1(協程)

預先知道:

    1.併發:是指一個時間段中有幾個程式都處於已啟動執行到執行完畢之間,且這幾個程式都是在同一個處理機(CPU)上執行,
       但任一個時刻點上只有一個程式在處理機上執行。
2.並行:是指任何時間點,有多個程式執行在多個CPU上(最多和CPU數量一致) 3.同步:是指程式碼呼叫IO操作時,必須等待IO操作完成才能返回的呼叫方式。 4.非同步:是指程式碼呼叫IO操作時,不必等待IO操作完成就能返回的呼叫方式。 5.阻塞:是指呼叫函式的時候當前執行緒被掛起。 6.非阻塞:是指呼叫函式的時候當前執行緒不會被掛起,而是立即返回。

unix下的5大io型別

  1阻塞I式/O:系統呼叫不會立即返回結果,當前執行緒會阻塞,等到獲得結果或報錯時在返回(問題:如在呼叫send()的同時,執行緒將被阻塞,
在此期間,執行緒將無法執行任何運算或響應任何的網路請求。)   2非阻塞式I
/O:呼叫後立即返回結果(問題:不一定三次握手成功,recv() 會被迴圈呼叫,迴圈呼叫recv()將大幅度推高CPU 佔用率),
做計算任務或者再次發起其他連線就較有優勢   3I
/O複用:它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料到達了,就通知使用者程序。
(阻塞式的方法,可以監聽多個socket狀態)(問題:將資料從核心複製到使用者空間的時間不能省)   5非同步I
/O:它就像是使用者程序將整個IO操作交給了他人(kernel)完成,然後他人做完後發訊號通知。在此期間,使用者程序不需要去檢查IO操作的狀態,
也不需要主動的去拷貝資料。

通過非阻塞io實現http請求:

import socket
from urllib.parse import urlparse

def get_url(url):
    #通過socket請求html
    url=urlparse(url)
    host=url.netloc
    path=url.path
    if path=="":
        path="/"
    #建立socket連線
client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #設定成非阻塞(拋異常:BlockingIOError: [WinError 10035] 無法立即完成一個非阻止性套接字操作。) client.setblocking(False) try: client.connect((host,80)) except BlockingIOError as e: pass #向伺服器傳送資料(還未連線會拋異常) while True: try: client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) break except OSError as e: pass #將資料讀取完 data=b"" while True: try: d=client.recv(1024) except BlockingIOError as e: continue if d: data+=d else: break #會將header資訊作為返回字串 data=data.decode('utf8') print(data.split('\r\n\r\n')[1]) client.close() if __name__=='__main__': get_url('http://www.baidu.com')
View Code

通過select完成http請求(利用迴圈回撥):

優點:併發性高(驅動整個程式主要是回撥迴圈loop()函式實現,不會等待。沒有執行緒的切換,只有一個執行緒,黨一個URL連線建立完成後就會註冊,然後進入執行)

#自動根據環境選擇poll和epoll
from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
selector=DefaultSelector()
urls=[]
#全域性變數
stop=False
class Fetcher:
    def connected(self, key):
        #取消註冊
        selector.unregister(key.fd)
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
        selector.register(self.client.fileno(),EVENT_READ,self.readable)

    def readable(self,key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            # 會將header資訊作為返回字串
            data = self.data.decode('utf8')
            print(data.split('\r\n\r\n')[1])
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop=True

    def get_url(self,url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"
        # 建立socket連線
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)
        try:
            self.client.connect((self.host, 80))
        except BlockingIOError as e:
            pass

        #註冊寫事件,及回撥函式
        selector.register(self.client.fileno(),EVENT_WRITE,self.connected)

def loop():
    #回撥+事件迴圈+select(poll/epoll)
    #事件迴圈,不停的呼叫socket的狀態並呼叫對應的回撥函式
    #判斷哪個可讀可寫,select本身不支援register模式
    #socket狀態變化後的回撥使用程式設計師完成的
    if not stop:
        while True:
            ready=selector.select()
            for key,mask in ready:
                call_back=key.data
                call_back(key)


if __name__=='__main__':
    fetcher=Fetcher()
    fetcher.get_url('http://www.baidu.com')
    loop()
View Code

協程:不帶返回值的函式呼叫,是一個可以暫停的函式。

解決方案:採用同步的方式編寫非同步的程式碼;採用單執行緒去解決任務。

協程的排程:時間迴圈+協程模式

#生成器是可以暫停的函式
import inspect
# def gen_func():
#     value=yield from
#     #第一返回值給呼叫方, 第二呼叫方通過send方式返回值給gen
#     return "bobby"
#1. 用同步的方式編寫非同步的程式碼, 在適當的時候暫停函式並在適當的時候啟動函式
import socket
def get_socket_data():
    yield 1

def downloader(url):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.setblocking(False)

    try:
        client.connect((host, 80))  # 阻塞不會消耗cpu
    except BlockingIOError as e:
        pass

    selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
    #如果get_socket_data()中出現異常,會直接拋給downloader(向上拋)
    source = yield from get_socket_data()
    data = source.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)

def download_html(html):
    html = yield from downloader()

if __name__ == "__main__":
    #協程的排程依然是 事件迴圈+協程模式 ,協程是單執行緒模式
    pass

參照連結:https://www.cnblogs.com/lyq-biu/p/10486148.html