Python資料抓取——多執行緒,非同步
作業系統可以同時執行多個任務。首先,考慮單核CPU是如何執行多工的:作業系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度非常快,給人的感覺就像所有任務都在同時執行一樣。真正的並行執行多工只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,作業系統也會自動把很多工輪流排程到每個核心上執行。
對於作業系統來說,一個任務就是一個程序(Process),比如開啟一個瀏覽器就是啟動一個瀏覽器程序,開啟一個記事本就啟動了一個記事本程序,開啟兩個記事本就啟動了兩個記事本程序,開啟一個Word就啟動了一個Word程序。有些程序還不止同時幹一件事,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個程序內部,要同時幹多件事,就需要同時執行多個“子任務”,我們把程序內的這些“子任務”稱為執行緒(Thread)
我們前面編寫的所有的Python程式,都是執行單任務的程序,也就是隻有一個執行緒。如果要同時執行多個任務有3種方案:一種是啟動多個程序,每個程序只開一個執行緒,但多個程序可以一塊執行多個任務。還有一種方法是啟動一個程序,在一個程序內啟動多個執行緒,多個執行緒也可以一塊執行多個任務。第三種方法,就是啟動多個程序,每個程序再啟動多個執行緒,這樣同時執行的任務就更多了,這種模型很複雜,實際很少採用
Python既支援多程序,又支援多執行緒。多工可以由多程序完成,也可以由一個程序內的多執行緒完成。程序是由若干執行緒組成的,一個程序至少有一個執行緒。由於執行緒是作業系統直接支援的執行單元,因此,高階語言通常都內建多執行緒的支援,Python也不例外,並且,Python的執行緒是真正的Posix Thread,而不是模擬出來的執行緒。Python的標準庫提供了兩個模組:thread和threading,thread是低階模組,threading是高階模組,對thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高階模組。啟動一個執行緒就是把一個函式傳入並建立Thread例項,然後呼叫start()開始執行。
import requests
import threading
def get_stock(code):
url = 'http://hq.sinajs.cn/list=' + code
resp = requests.get(url)
print('%s\n' % resp.text)
#多執行緒非同步,加速抓取
#根據有幾個股票程式碼,就建立幾個執行緒
codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
threads = [threading.Thread(target=get_stock, args=(code, )) for code in codes]
#Thread建立執行緒例項
'''
threads=[ ]
for code in codes:
thread=threading.Thread(target=get_stock,args=(code, ))
threads.append(thread)
'''
for t in threads:
t.start() #啟動一個執行緒
for t in threads:
t.join() #等待每個執行緒執行結束
多工用執行緒池自動排程:
import requests
import threadpool #執行緒池
def get_stock(code):
url = 'http://hq.sinajs.cn/list=' + code
resp = requests.get(url)
print('%s\n' % resp.text)
codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
#codes裡任務很多,比如幾百個,讓pool自己去排程
pool = threadpool.ThreadPool(2) #執行緒池設定,最多同時跑兩個執行緒
tasks = threadpool.makeRequests(get_stock, codes)
#makeRequests構造執行緒task請求,第一個引數是執行緒函式,第二個是引數陣列
[pool.putRequest(task) for task in tasks]
#列表推導式,putRequest向執行緒池裡加task,讓pool自己去排程task
pool.wait() #等所有任務結束
非同步
交出當前CPU的控制權,最大化利用當前單個CPU的效率
import aiohttp #表示http請求是非同步方式去請求的
import asyncio #當非同步請求返回時,通知非同步操作完成
#非同步可以參考grequests庫的使用:https://github.com/kennethreitz/grequests
async def get_stock(code):
#關鍵字async表示請求是非同步的
url = 'http://hq.sinajs.cn/list=' + code
resp = await aiohttp.request('GET', url) # yield
#await表示任務等待時,不佔用CPU資源,通知請求返回
body = await resp.read()
#表示從網路上把請求的東西都讀回來
text = body.decode('gb2312') #對讀回來的原始位元組解碼
print(text)
resp.close()
codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
tasks = [get_stock(code) for code in codes]
#由於是非同步請求,這裡get_stock(code)並不會被馬上執行,只是佔用了一個位置
loop = asyncio.get_event_loop() #loop的作用是——做完任務,事件通知
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
#tasks生成一組併發的非同步任務,loop表示非同步作用完成後等待通知