1. 程式人生 > >python爬蟲之多執行緒、多程序+程式碼示例

python爬蟲之多執行緒、多程序+程式碼示例

#python爬蟲之多執行緒、多程序 >使用多程序、多執行緒編寫爬蟲的程式碼能有效的提高爬蟲爬取目標網站的效率。 ## 一、什麼是程序和執行緒 引用[廖雪峰的官方網站](https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376)關於程序和執行緒的講解: 程序:對於作業系統來說,一個任務就是一個程序(Process),比如開啟一個瀏覽器就是啟動一個瀏覽器程序,開啟一個記事本就啟動了一個記事本程序,開啟兩個記事本就啟動了兩個記事本程序,開啟一個Word就啟動了一個Word程序。 執行緒:有些程序還不止同時幹一件事,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個程序內部,要同時幹多件事,就需要同時執行多個“子任務”,我們把程序內的這些“子任務”稱為執行緒(Thread)。 每個程序至少要做一件事,所以,一個程序至少有一個執行緒。 ## 二、多程序 ### 實現多程序的四種方式 #### os.fork() python 的 os 模組封裝了常見的系統呼叫,其中,多程序的呼叫就是 fork() 函式。具體示例程式碼如下: ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- """ fork() 1.只有在Unix系統中有效,Windows系統中無效 2.fork函式呼叫一次,返回兩次:在父程序中返回值為子程序id,在子程序中返回值為0 """ import os pid = os.fork() if pid == 0: print("執行子程序,子程序pid={pid},父程序ppid={ppid}".format(pid=os.getpid(), ppid=os.getppid())) else: print("執行父程序,子程序pid={pid},父程序ppid={ppid}".format(pid=pid, ppid=os.getpid())) # 執行父程序,子程序pid=611,父程序ppid=610 # 執行子程序,子程序pid=611,父程序ppid=610 ``` ####Process 類 通過 Multiprocessing 模組中的 Process 類,建立Process物件。 Process類的構造方法: init(self, group=None, targent=None, name=None, args=(), kwargs={}) |引數|說明| |---|---| |group|程序所屬組,基本不用。| |targent|表示呼叫物件,一般為函式。| |args|表示呼叫物件引數元祖。| |name|程序別名。| |kwargs|表示呼叫物件的字典。| 具體示例程式碼如下: ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- from multiprocessing import Process def run_process(name): print(name) if __name__ == "__main__": p = Process(target=run_process, args=("test",)) p.start() p.join() print("子程序結束") # test # 子程序結束 ``` ####繼承 Process 類 通過繼承Process類,重寫 run 方法。使用 .start() 方法,會自動呼叫 run 方法。具體示例程式碼如下: ```python from multiprocessing import Process class NewProcess(Process): def __init__(self, n): super(NewProcess, self).__init__() self.n = n def run(self): print(self.n) if __name__ == "__main__": test = "test" p = NewProcess(test) p.start() p.join() print("子程序結束") # test # 子程序結束 ``` ####程序池 Pool 類 Pool 類可以提供指定數量(**一般為CPU的核數**)的程序供使用者呼叫,當有新的請求提交的 Pool 中時,如果池中還沒有滿,就會建立一個新的程序來執行這些請求。**如果池滿,請求就會告知先等待。直到池中有程序結束,才會建立新的程序來執行這些請求。** 注意:程序池中的程序是不能共享佇列和資料的,而 Process 生成的子程序可以共享佇列。 Pool 類中的常用方法: |函式|函式原型|說明| |---|---|---| |apply()|apply(func[, args=()[, kwds={}]])|該函式用於傳遞不定引數,主程序會被阻塞直到函式執行結束(不建議使用,並且3.x以後不再出現)。| |apply_async()|apply_async(func[, args()[, kwds{}[, callback=None]]])|與apply用法一樣,但它是非阻塞且支援結果返回進行回撥。| |map()|map(func, utterable[, chunksize=None])|Pool類中的map方法,與內建的map函式用法行為基本一致,它會使程序阻塞直到返回結果。第二個引數是一個迭代器,但在實際使用中,必須在整個佇列都就緒後,程式才會執行子程序。| |close()||關閉程序池(Pool),使其不能再新增新的Process。| |terminate()||結束工作程序,不再處理未處理的任務。| |join()||主程序阻塞等待子程序的退出,join方法必須在close或terminate之後使用。| 具體程式碼如下: ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- import time from multiprocessing import Pool def run(num): time.sleep(1) return num * num if __name__ == "__main__": testList = [1, 2, 3, 4, 5, 6, 7] print('單程序執行') # 順序執行 t1 = time.time() for i in testList: run(i) t2 = time.time() print('順序執行的時間為:', int(t2 - t1)) print('多程序 map 執行') # 並行執行 p = Pool(4) # 建立擁有4個程序數量的程序池 result = p.map(run, testList) p.close() # 關閉程序池,不再接受新的任務 p.join() # 主程序阻塞等待子程序的退出 t3 = time.time() print('執行的時間為:', int(t3 - t2)) print(result) # 單程序執行 # 順序執行的時間為: 7 # 多程序 map 執行 # 執行的時間為: 2 # [1, 4, 9, 16, 25, 36, 49] ``` ###程序通訊 ####Queue() **佇列**:先進先出,按照順序 **通訊原理**:在記憶體中建立佇列資料結構模型。多個程序都可以通過佇列存入內容,取出內容的順序和存入內容的順序儲存一致。 |方法|功能|引數| |---|---|---| |q = Queue(maxsize = 0)|建立佇列訊息,並返回佇列物件。|表示最多儲存多少訊息。預設表示根據記憶體分配儲存。| |q.put(data, [block, timeout])|向佇列儲存訊息。|Data:要存入的資料。block:預設佇列滿時會堵塞,設定False則非堵塞。timeout:超時時間。| |data = q.get([block, timeout])|獲取佇列訊息。|block:預設佇列空時會堵塞,設定False則非堵塞。timeout:超時時間。| |q.full()|判斷佇列是否為滿。|| |q.empty()|判斷佇列是否為空。|| |q.size()|判斷佇列中的訊息數量。|| |q.close()|關閉佇列。|| ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- from multiprocessing import Process, Queue def foo(data): s = data.get() # 管子的另一端放在子程序這裡,子程序接收到了資料 if s not in "": print('子程序已收到資料...') print(s) # 子程序打印出了資料內容... if __name__ == '__main__': # 要加這行... q = Queue() # 建立程序通訊的Queue,你可以理解為我拿了個管子來... p = Process(target=foo, args=(q,)) # 建立子程序 print('主程序準備傳送資料...') q.put("資料接收成功") # 將管子的一端放在主程序這裡,主程序往管子裡丟入資料↑ p.start() # 啟子子程序 p.join() # 主程序準備傳送資料... # 子程序已收到資料... # 資料接收成功 ``` #### Pipe() **通訊原理**:在記憶體中開闢管道空間,生成管道操作物件,多個程序使用“同一個”管道物件進行操作即可實現通訊。 ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) # 向管道中寫入內容 conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # 從管道讀取資訊 p.join() # prints "[42, None, 'hello']" ``` #### manager() 程序的 manager 方法可以共享資料,比如共享列表,元祖,字典,鎖,字元。 ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- import multiprocessing def f(m_list): m_list.append("f") if __name__ == '__main__': manager = multiprocessing.Manager() m_list = manager.list([1, 2, 3]) p = multiprocessing.Process(target=f, args=(m_list, )) p.start() p.join() print(m_list) # [1, 2, 3, 'f'] ``` ## 三、多執行緒 執行緒在程式中是獨立的、並非的執行流。與分隔的程序相比執行緒之間的隔離程度要小,它們共享記憶體,檔案控制代碼和其它程序應有的狀態。**多執行緒之間共享全域性變數**。 ###建立多執行緒多兩種方式 ####threading模組Thread類 具體程式碼如下: ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- import threading import time def run(n): print("task", n) time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == '__main__': t1 = threading.Thread(target=run, args=("t1",)) t2 = threading.Thread(target=run, args=("t2",)) t1.start() t2.start() t1.join() t2.join() # task t1 # task t2 # 1s # 1s # 0s # 0s ``` #### 自定義執行緒 繼承threading.Thread類自定義執行緒類。其本質是重構Thread類中的run方法。 ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- from threading import Thread import time class MyThread(Thread): def __init__(self, n): super(MyThread, self).__init__() self.n = n def run(self): print("task", self.n) time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == '__main__': t1 = MyThread("t1") t2 = MyThread("t2") t1.start() t2.start() # task t1 # task t2 # 1s # 1s # 0s # 0s ``` ### 守護執行緒 setDaemon(True)把所有的子執行緒都變成了主執行緒的守護執行緒,因此當主程序結束後,子執行緒也會隨之結束。所以當主執行緒結束後,整個程式就退出了。 ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- import threading import time def run(n): print("task", n) time.sleep(1) # 此時子執行緒停1s print('2') time.sleep(1) print('1') if __name__ == '__main__': t = threading.Thread(target=run, args=("t1",)) t.setDaemon(True) # 把子程序設定為守護執行緒,必須在start()之前設定 t.start() print("end") # task t1 # end ``` 想要守護執行緒執行結束後,主程序再結束,可以使用 join 方法,讓主執行緒等待子執行緒執行完畢。 ### Lock 多執行緒和多程序最大的不同在於,多程序中,同一個變數,各自都有一份拷貝存與每個程序中,互不影響,而多執行緒中,所有變數都由所有執行緒共享,所以,任何一個變數都可以被任何一個執行緒修改,因此,執行緒之間共享資料最大的危險在於多個執行緒同時改一個變數,把內容給改亂了。 ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- import threading value = 0 lock = threading.Lock() def change_it(n): # 先存後取,結果應該為0: global value value = value + n value = value - n # 未加鎖(值不確定) def run_thread(n): for i in range(2000000): change_it(n) # 加鎖 # def run_thread(n): # for i in range(2000000): # lock.acquire() # try: # change_it(n) # finally: # lock.release() t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(value) # 29 ``` 由於鎖只有一個,無論多少執行緒,同一時刻最多隻有一個執行緒持有該鎖,所以不會造成修改的衝突。當多個執行緒同時執行 **lock.acquire()** 時,只有一個執行緒能成功獲取鎖,然後繼續執行程式碼,其它執行緒就繼續等待直到獲得鎖為止。 獲得鎖的執行緒用完一定要釋放鎖,否則那些等待鎖的執行緒將會永遠的等待下去,成為死執行緒。所以用 **try...finally** 來確保鎖一定會被釋放。 鎖的好處就是確保某段關鍵程式碼只能由一個執行緒從頭到尾完整的執行,壞處當然也很多,首先是阻止了多執行緒併發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行,效率大大的下降了。其次,由於可以存在多個鎖,不同的執行緒持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個執行緒全部掛起,既不能執行,也無法結束,只能靠作業系統強制終止。 ### 訊號量(BoundedSemaphore類) Lock同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒去更改資料。 ```python import threading import time def run(n, semaphore): semaphore.acquire() #加鎖 time.sleep(1) print("run the thread:%s\n" % n) semaphore.release() #釋放 if __name__ == '__main__': num = 0 semaphore = threading.BoundedSemaphore(5) # 最多允許5個執行緒同時執行 for i in range(22): t = threading.Thread(target=run, args=("t-%s" % i, semaphore)) t.start() while threading.active_count() != 1: pass # print threading.active_count() else: print('-----all threads done-----') ``` ###GIL鎖 在非 python 環境中,單核情況下,同時只能有一個任務執行。多核可以同時支援多個執行緒同時執行。但是在 python 中,無論有多少核,同只能執行一個執行緒。究其原因,這就是GIL的存在導致的。 GIL全稱Global Interpreter Lock(全域性直譯器鎖),來源是python設計之初的考慮,為了資料安全所做的決定。某個執行緒想要執行,必須先拿到GIL,我們可以把GIL看作是“通行證”,並且在一個python程序中,GIL只有一個。拿不到通行證的執行緒,就不允許進入CPU執行。GIL只有在cpython中才有,因為cpython呼叫的是c語言的原生執行緒,所以他不能直接操作cpu,只能利用GIL保證同一時間只能有一個執行緒拿到資料,而在pypy和jpython中是沒有GIL的。 #### python針對不同型別的程式碼執行效率也是不同的。 1、cpu密集型程式碼(各種迴圈處理、計數等),在這種情況下,由於計算機工作多,ticks計數很快就會達到閾值。然後觸發GIL的釋放與再競爭(多個執行緒來回切換是需要消耗資源的),所以python下的多執行緒對cpu密集型代並不友好。 2、IO密集型程式碼(檔案處理,網路爬蟲等涉及檔案讀寫的操作),多執行緒能夠有效提升效率(單執行緒下有IO操作會進行IO等待,造成不必要的浪費,而開啟多執行緒能線上程A等待時,自動切換到執行緒B,可以不浪費CPU的資源,從而能提升程式執行效率)。所以python的多執行緒對IO密集型程式碼比較友好。 #### 使用建議 python下想要充分利用多核CPU,就使用多程序。因為每個程序都有各子獨立的GIL,互不干擾,這樣就可以真正意義上的並行執行,在python中,多程序的執行效率優於多執行緒(僅僅針對多核CPU而言)。 ##四、爬取豆瓣電影TOP250 採取三種方式。爬取前250名電影。 (1)所爬取的網頁連結:https://movie.douban.com/top250?start=0&filter= (2)通過分析網頁,發現第一頁的url start=0,第二頁的url start=25,第三頁的url start=50。 (3)主要爬取電影名跟評分,用來進行比對,所以資料方面就不過多的提取和儲存,只簡單的打印出來。 ###多程序爬取 ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- import multiprocessing from multiprocessing import Process, Queue import time from lxml import etree import requests class DouBanSpider(Process): def __init__(self, q, url_list, lock): # 重寫寫父類的__init__方法 super(DouBanSpider, self).__init__() self.url_list = url_list self.q = q self.lock = lock self.headers = { 'Host': 'movie.douban.com', 'Referer': 'https://movie.douban.com/top250?start=225&filter=', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36', } def run(self): self.parse_page() def send_request(self, url): ''' 用來發送請求的方法 :return: 返回網頁原始碼 ''' # 請求出錯時,重複請求3次, i = 0 while i <= 3: try: print(u"[INFO]請求url:" + url) return requests.get(url=url, headers=self.headers).content except Exception as e: print(u'[INFO] %s%s' % (e, url)) i += 1 def parse_page(self): ''' 解析網站原始碼,並採用xpath提取 電影名稱和平分放到佇列中 :return: ''' time.sleep(0.1) while 1: try: url = self.url_list.pop() except IndexError as e: break self.lock.acquire() response = self.send_request(url) html = etree.HTML(response) #  獲取到一頁的電影資料 node_list = html.xpath("//div[@class='info']") for move in node_list: # 電影名稱 title = move.xpath('.//a/span/text()')[0] # 評分 score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0] # 將每一部電影的名稱跟評分加入到佇列 self.q.put(score + "\t" + title) self.lock.release() class AllUrlSpider(Process): def __init__(self, url_lis): super(AllUrlSpider, self).__init__() self.url_list = url_lis def run(self): base_url = 'https://movie.douban.com/top250?start=' # 構造所有url for num in range(225, -1, -25): self.url_list.append(base_url + str(num)) print("獲得URL:{}".format(base_url + str(num))) def main(): # 建立一個佇列用來儲存程序獲取到的資料 q = Queue() lock = multiprocessing.Lock() manager = multiprocessing.Manager() url_list = manager.list() a = AllUrlSpider(url_list) p = DouBanSpider(q, url_list, lock) b = DouBanSpider(q, url_list, lock) c = DouBanSpider(q, url_list, lock) a.start() p.start() b.start() c.start() a.join() p.join() b.join() c.join() while not q.empty(): print(q.get()) if __name__ == "__main__": start = time.time() main() print('[info]耗時:%s' % (time.time() - start)) ``` 多程序爬取耗時7.15秒,部分結果如下圖所示: ![](https://img2020.cnblogs.com/blog/1878490/202008/1878490-20200826114846376-1056604106.png) ### 多執行緒爬取 ```python #!/usr/bin/env python2 # -*- coding=utf-8 -*- from queue import Queue from threading import Thread import threading import time from lxml import etree import requests url_list = [] lock = threading.Lock() class DouBanSpider(Thread): def __init__(self, q) : # 重寫寫父類的__init__方法 super(DouBanSpider, self).__init__() self.q = q self.headers = { 'Host': 'movie.douban.com', 'Referer': 'https://movie.douban.com/top250?start=225&filter=', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36', } def run(self): self.parse_page() def send_request(self, url): ''' 用來發送請求的方法 :return: 返回網頁原始碼 ''' # 請求出錯時,重複請求3次, i = 0 while i <= 3: try: print u"[INFO]請求url:" + url html = requests.get(url=url, headers=self.headers).content except Exception as e: print u'[INFO] %s%s' % (e, url) i += 1 else: return html def parse_page(self): ''' 解析網站原始碼,並採用xpath提取 電影名稱和平分放到佇列中 :return: ''' while 1: try: url = url_list.pop() except IndexError as e: break lock.acquire() response = self.send_request(url) html = etree.HTML(response) #  獲取到一頁的電影資料 node_list = html.xpath("//div[@class='info']") for move in node_list: # 電影名稱 title = move.xpath('.//a/span/text()')[0] # 評分 score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0] # 將每一部電影的名稱跟評分加入到佇列 self.q.put(score + "\t" + title) lock.release() class AllUrlSpider(Thread): def run(self): base_url = 'https://movie.douban.com/top250?start=' # 構造所有url for num in range(225, -1, -25): url_list.append(base_url + str(num)) print("獲得URL:{}".format(base_url + str(num))) def main(): # 建立一個佇列用來儲存程序獲取到的資料 q = Queue() a = AllUrlSpider() a.start() # 儲存執行緒 Thread_list = [] # 建立並啟動執行緒 for i in range(5): p = DouBanSpider(q) p.start() Thread_list.append(p) a.join() # 讓主執行緒等待子執行緒執行完成 for i in Thread_list: i.join() while not q.empty(): print(q.get()) if __name__ == "__main__": start = time.time() main() print('[info]耗時:%s' % (time.time() - start)) ``` 多執行緒爬取耗時5秒,部分結果如下圖所示: ![](https://img2020.cnblogs.com/blog/1878490/202008/1878490-20200826114857325-1361440071.png) 耗時跟網路的好壞也是有一定的關係,每次測出的資料結果也不一樣。但理論上來講,執行緒在I/O密集的操作性是要高於程序的。