1. 程式人生 > >爬蟲多執行緒

爬蟲多執行緒

定義多執行緒類, 爬蟲類 爬取 m.sohu.com的內容中的帶有href屬性的a連結地址

import logging
from enum import unique, Enum
from queue import Queue
from random import random
from threading import current_thread, Thread
from time import sleep
from urllib.parse import urlparse

import requests
from bs4 import BeautifulSoup

visited_urls = set()


@unique
class SpiderStatus(Enum): """ 定義一個爬蟲的狀態類, unique意思是類裡面定義的值是不能重複的, 繼承Enum類這是一個列舉類 """ IDEL = 0 WORKING = 1 def decode_page(page_bytes, charsets=('utf-8',)): """ 定義一個專門用來解碼的函式, 傳入的charsets預設有多個值 只寫了一個預設值, 元祖裡面傳入引數,多個值不寫完後面加逗號 :param page_bytes: page_bytes是傳入的未解碼的頁面, :param charsers: 編碼種類 :return: 返回解碼後的頁面 """
page_html = None for charset in charsets: try: # 解碼成功 程式才會執行 break page_html = page_bytes.decode(charset) break except UnicodeDecodeError as e: pass logging.error('Decode:', e) return page_html class Retry(object)
:
""" 包裝器 """ def __init__(self, *, retry_times=3, wait_secs=5, error=(Exception, )): """ 給物件定義屬性 :param retry_times: 爬蟲當爬取內容為空或者失敗時 嘗試爬取的次數 :param wait_secs: 爬取 兩次相隔的時間 :param error: 錯誤 傳入的是個元祖意思是可以傳入多個, 不過給了一個預設 """ self.retry_times = retry_times self.wait_secs = wait_secs self.error = error def __call__(self, fn): """ call的作用是把包裝器的物件變成函式 裝飾器都是呼叫的函式 :param fn: 無論什麼函式 都用fn代表 :return: 返回包裝器 """ def wrapper(*args, **kwargs): """ 函式嘗試的次數 當 成功返回函式自己程式就不往下執行了,如果 沒有成功接著迴圈, 有異常時進行異常處理 當for迴圈結束時 還沒有返回fn 就返回None 返回wrapper 是返回給call函式的 包裝器好像就是在操作 fn 函式 傳入一些引數用來限制 fn """ for _ in range(self.retry_times): try: return fn(*args, **kwargs) except self.error as e: logging.error(e) logging.info('[Retry]') sleep((random()+1)* self.wait_secs) return None return wrapper class Spider(object): """ 爬蟲類 """ def __init__(self, task_queue): """ :param task_queue: 給爬蟲傳入任務佇列,爬蟲是要幹活的 """ self.status = SpiderStatus.IDEL # 這是一個裝飾器 裡面是可以傳入引數的 預設是三次 間隔時間5 可以傳入retry_times=5 wait_secs=10 給物件的屬性傳入引數 # 不是直接傳入值就行了嘛, 為什麼還要加屬性的名字呢, 是因為 * 後面的為命名引數 意思是 * 號後面的引數 傳入值時需要加上引數名字 @Retry() def fetch(self, current_url, *, charsets=('utf-8', ), user_agent=None, proxies=None): """ 爬取頁面的方法 :param current_url: url地址 :param charsets: 採用的編解碼方式 :param user_agent: 使用者代理,冒充有名的爬蟲爬取網站, 網站一般會禁止不知名的爬蟲爬取 :param proxies:代理 :return: 返回爬取的頁面 """ # 列印 current_url 利用current_thread函式獲取當前執行緒的名字 logging.info('[Fetch]:' + current_url) thread_name = current_thread().name # 打印出程序的名字 和url f是格式化的意思,代替了 %s %d形式 print(f'[{thread_name}]:{current_url}') # 冒充使用者 headers = {'user-agent': user_agent}\ if user_agent else {} # 獲取頁面 resp = requests.get(current_url, headers=headers, proxies=proxies) # 解碼並返回頁面 呼叫解碼函式 return decode_page(resp.content, charsets) if resp.status_code == 200 else None def parse(self, html_page, *, domain='m.sohu.com'): """ 解析頁面 :param html_page: 傳入要解析的頁面 :param domain: 傳入域名 :return: 返回從頁面中提取的url """ # 解析頁面 soup是一個完整的html頁面 soup = BeautifulSoup(html_page, 'lxml') url_links = [] # 取出頁面中 帶有href屬性的a標籤 for a_tag in soup.body.select('a[href]'): # 使用urlparse函式解析url地址, 該函式是分段解析 # 要解析的原因是url可能不全 parser= urlparse(a_tag.attrs['href']) # 標題的文字協議 可能url中沒有則拿取可能為空 空了把後面的 http 文字協議賦給scheme scheme = parser.scheme or 'http' # 域名 parser.netloc拿取 域名 www.baidu.com netloc = parser.netloc or 'domain' # 拿取的url的文字協議中有javascript的, 這樣的我不要 # 想要爬的是sohu 所以先讓 netloc等於預設域名souhu的 這是第一次拿 if scheme != 'javascript' and netloc == domain: # 拿取url中的絕對路徑部分, 像/user/user/型別 path = parser.path # parser.query 取到的是url的?後面傳入的引數 query = '?' + parser.query if parser.query else '' # 格式化url, 用上面取到的分段內容拼接成一個完整的url full_url = f'{scheme}://{netloc}{path}{query}' # 如果 url沒有訪問過 if full_url not in visited_urls: # 就把它放到列表中 url_links.append(full_url) # 返回解析出來的完整的沒有訪問過的url return url_links def extract(self, html_page): """ 從頁面中摘取內容 :param html_page: 頁面 :return: """ pass def store(self, data_dict): """ 儲存資料 :param data_dict: :return: """ pass class SpiderThread(Thread): """ 定義一個多執行緒類 用來啟動爬蟲 繼承了Thread類,自帶的 該類就是一個執行緒類 單個單個執行緒的寫法: Thread(target=foo, agrs=( , )).start() foo是目標函式,執行緒要啟動的函式, args 是給目標函式傳入的引數 這是一個執行緒類,呼叫該類時自動呼叫run函式, run函式中肯定要使用蜘蛛實現爬取頁面 解析頁面 把解析出來的東西 做下處理, 一個執行緒類裡面定義的就是一個執行緒, 一個蜘蛛 一個蜘蛛類裡面定義的就是一個蜘蛛該乾的事情, 想要啟動多執行緒 多個蜘蛛 就在main函式中 通過for迴圈 實現 , 現在呼叫執行緒類就執行run函式,而run函式中,使用蜘蛛爬去和解析了頁面 蜘蛛是傳進來的,所以定義一個蜘蛛類, 蜘蛛的行為,是在類中定義好的, 該蜘蛛的某個行為幹某件事情 直接利用蜘蛛點一下就可以了, 不得不讚嘆爬蟲太偉大了, 教我們一個人就幹一件事情,一個人就做好 自己眼前的事情 執行緒類的區別是, 你呼叫一次等於啟一個執行緒,大家是同時執行的 只不過共用計算機記憶體, 公用資源而已 而普通類,你呼叫一次一次,他們是排隊等候的,當上一個程式執行完成,才會執行下一個 執行緒太偉大了 多程序 運作方式,一樣, 只不過是不共用資源,啟一個自動複製一個資源,佔用一份記憶體 但是效率更高 一個類定義的就是一個物件,一個方法,一個物件的屬性,一個物件的行為 """ def __init__(self, spider, task_queue): """ 只是引數而已, 傳入時是靈活的 daemon=True 是把執行緒設為守護執行緒, 當主執行緒掛了,守護執行緒也掛 :param spider: 傳入爬蟲函式 :param task_queue: 任務佇列 給爬蟲函式的引數 """ super().__init__(daemon=True) self.spider = spider self.task_queue = task_queue def run(self): """ 這是個回撥函式 前面有 'O+向上的箭頭' 代表實回撥函式 Thread是有這個函式的 是方法的重寫 當利用這個類建立物件時,會自動呼叫該函式 :return: """ # 這是一個死迴圈, 啟動一個執行緒 時 執行緒 不死 一直執行 所以應該可以設定一個執行緒存活的時間 while True: # 從佇列中取出url current_url = self.task_queue.get() # 把url新增到訪問過的url的集合中 把他放進訪問過的url集合的 # 原因是 放置其他程序爬取, 10個程序是公用佇列的, 不過有 # 上面的判斷,爬取過了,就不會再爬了 visited_urls.add(current_url) # 標記爬蟲為工作狀態 self.spider.status = SpiderStatus.WORKING # 爬蟲 爬取頁面 這個spider是上面傳入的spider,這只是個引數, 當真正的爬蟲傳進來時所有的spider引數都會自動換成真正的引數 # 相當與數學中的 引數 X X可以代表任意數 html_page = self.spider.fetch(current_url) # 如果頁面存在 不是沒有 或者 獲取到的頁面是一個空字串 if html_page not in [None, '']: # 呼叫爬蟲下的 解析函式 解析頁面 解析函式解析出來的是url_links url_links = self.spider.parse(html_page) # 把url放到佇列中 for url_link in url_links: self.task_queue.put(url_link) # 爬完一次之後把爬蟲狀態 標記為空閒狀態 self.spider.status = SpiderStatus.IDEL def is_any_alive(spider_threads): """ 判斷程序是否活著 :param spider_threads: 程序 :return:有一個為 True 返回True 全都死掉返回false """ return any([spider_thread.spider.status == SpiderStatus.WORKING\ for spider_thread in spider_threads]) def main(): """ 主程序,當它停止執行時, 守護執行緒就卵了 主程序裡主要就是定義類的引數 :return: """ # Queue佇列的意思, 可以往裡面存取東西 定義任務佇列 task_queue = Queue() # 存東西用put 拿東西用get 先放入sohu的url task_queue.put('http://m.sohu.com/') # 呼叫執行緒物件傳入引數,傳入爬蟲物件 和 任務佇列兩個引數 # spider()這是個爬蟲物件 建立執行緒物件 會自動呼叫回撥函式run spider_threads = [SpiderThread(Spider(task_queue), task_queue) for _ in range(10)] # 取出每一個執行緒 啟動執行緒 因為這是一個執行緒類 要是普通類就不需要這一步了 for spider_thread in spider_threads: spider_thread.start() # 為了不讓程式執行完了之後主執行緒結束 要不主執行緒結束 守護執行緒就掛了 當沒有任務了 為false時 while迴圈停止 任務隨著爬蟲的執行源源不斷的加入佇列之中 # 當 沒有時 意味著爬完了 或者當執行緒全死了 也停止迴圈 其他pass 意思是什麼也不執行過,那就進行第二次迴圈 # 這個迴圈其實就是為了保證 守護執行緒不死 寫的不好, 不過暫時先這麼用 # 什麼時候結束了 程式往下執行輸入 'Over' while not task_queue or is_any_alive(spider_threads): pass print('Over!') if __name__ == '__main__': main()