爬蟲多執行緒
阿新 • • 發佈:2019-01-03
定義多執行緒類, 爬蟲類 爬取 m.sohu.com的內容中的帶有href屬性的a連結地址
import logging
from enum import unique, Enum
from queue import Queue
from random import random
from threading import current_thread, Thread
from time import sleep
from urllib.parse import urlparse
import requests
from bs4 import BeautifulSoup
visited_urls = set()
@unique
class SpiderStatus(Enum):
"""
定義一個爬蟲的狀態類, unique意思是類裡面定義的值是不能重複的, 繼承Enum類這是一個列舉類
"""
IDEL = 0
WORKING = 1
def decode_page(page_bytes, charsets=('utf-8',)):
"""
定義一個專門用來解碼的函式, 傳入的charsets預設有多個值
只寫了一個預設值, 元祖裡面傳入引數,多個值不寫完後面加逗號
:param page_bytes: page_bytes是傳入的未解碼的頁面,
:param charsers: 編碼種類
:return: 返回解碼後的頁面
"""
page_html = None
for charset in charsets:
try:
# 解碼成功 程式才會執行 break
page_html = page_bytes.decode(charset)
break
except UnicodeDecodeError as e:
pass
logging.error('Decode:', e)
return page_html
class Retry(object) :
"""
包裝器
"""
def __init__(self, *, retry_times=3, wait_secs=5, error=(Exception, )):
"""
給物件定義屬性
:param retry_times: 爬蟲當爬取內容為空或者失敗時 嘗試爬取的次數
:param wait_secs: 爬取 兩次相隔的時間
:param error: 錯誤 傳入的是個元祖意思是可以傳入多個, 不過給了一個預設
"""
self.retry_times = retry_times
self.wait_secs = wait_secs
self.error = error
def __call__(self, fn):
"""
call的作用是把包裝器的物件變成函式 裝飾器都是呼叫的函式
:param fn: 無論什麼函式 都用fn代表
:return: 返回包裝器
"""
def wrapper(*args, **kwargs):
"""
函式嘗試的次數 當 成功返回函式自己程式就不往下執行了,如果 沒有成功接著迴圈,
有異常時進行異常處理 當for迴圈結束時 還沒有返回fn 就返回None
返回wrapper 是返回給call函式的
包裝器好像就是在操作 fn 函式 傳入一些引數用來限制 fn
"""
for _ in range(self.retry_times):
try:
return fn(*args, **kwargs)
except self.error as e:
logging.error(e)
logging.info('[Retry]')
sleep((random()+1)* self.wait_secs)
return None
return wrapper
class Spider(object):
"""
爬蟲類
"""
def __init__(self, task_queue):
"""
:param task_queue: 給爬蟲傳入任務佇列,爬蟲是要幹活的
"""
self.status = SpiderStatus.IDEL
# 這是一個裝飾器 裡面是可以傳入引數的 預設是三次 間隔時間5 可以傳入retry_times=5 wait_secs=10 給物件的屬性傳入引數
# 不是直接傳入值就行了嘛, 為什麼還要加屬性的名字呢, 是因為 * 後面的為命名引數 意思是 * 號後面的引數 傳入值時需要加上引數名字
@Retry()
def fetch(self, current_url, *, charsets=('utf-8', ), user_agent=None, proxies=None):
"""
爬取頁面的方法
:param current_url: url地址
:param charsets: 採用的編解碼方式
:param user_agent: 使用者代理,冒充有名的爬蟲爬取網站, 網站一般會禁止不知名的爬蟲爬取
:param proxies:代理
:return: 返回爬取的頁面
"""
# 列印 current_url 利用current_thread函式獲取當前執行緒的名字
logging.info('[Fetch]:' + current_url)
thread_name = current_thread().name
# 打印出程序的名字 和url f是格式化的意思,代替了 %s %d形式
print(f'[{thread_name}]:{current_url}')
# 冒充使用者
headers = {'user-agent': user_agent}\
if user_agent else {}
# 獲取頁面
resp = requests.get(current_url, headers=headers,
proxies=proxies)
# 解碼並返回頁面 呼叫解碼函式
return decode_page(resp.content, charsets) if resp.status_code == 200 else None
def parse(self, html_page, *, domain='m.sohu.com'):
"""
解析頁面
:param html_page: 傳入要解析的頁面
:param domain: 傳入域名
:return: 返回從頁面中提取的url
"""
# 解析頁面 soup是一個完整的html頁面
soup = BeautifulSoup(html_page, 'lxml')
url_links = []
# 取出頁面中 帶有href屬性的a標籤
for a_tag in soup.body.select('a[href]'):
# 使用urlparse函式解析url地址, 該函式是分段解析
# 要解析的原因是url可能不全
parser= urlparse(a_tag.attrs['href'])
# 標題的文字協議 可能url中沒有則拿取可能為空 空了把後面的 http 文字協議賦給scheme
scheme = parser.scheme or 'http'
# 域名 parser.netloc拿取 域名 www.baidu.com
netloc = parser.netloc or 'domain'
# 拿取的url的文字協議中有javascript的, 這樣的我不要
# 想要爬的是sohu 所以先讓 netloc等於預設域名souhu的 這是第一次拿
if scheme != 'javascript' and netloc == domain:
# 拿取url中的絕對路徑部分, 像/user/user/型別
path = parser.path
# parser.query 取到的是url的?後面傳入的引數
query = '?' + parser.query if parser.query else ''
# 格式化url, 用上面取到的分段內容拼接成一個完整的url
full_url = f'{scheme}://{netloc}{path}{query}'
# 如果 url沒有訪問過
if full_url not in visited_urls:
# 就把它放到列表中
url_links.append(full_url)
# 返回解析出來的完整的沒有訪問過的url
return url_links
def extract(self, html_page):
"""
從頁面中摘取內容
:param html_page: 頁面
:return:
"""
pass
def store(self, data_dict):
"""
儲存資料
:param data_dict:
:return:
"""
pass
class SpiderThread(Thread):
"""
定義一個多執行緒類 用來啟動爬蟲 繼承了Thread類,自帶的
該類就是一個執行緒類
單個單個執行緒的寫法:
Thread(target=foo, agrs=( , )).start()
foo是目標函式,執行緒要啟動的函式, args 是給目標函式傳入的引數
這是一個執行緒類,呼叫該類時自動呼叫run函式, run函式中肯定要使用蜘蛛實現爬取頁面
解析頁面 把解析出來的東西 做下處理, 一個執行緒類裡面定義的就是一個執行緒, 一個蜘蛛
一個蜘蛛類裡面定義的就是一個蜘蛛該乾的事情, 想要啟動多執行緒 多個蜘蛛 就在main函式中
通過for迴圈 實現 , 現在呼叫執行緒類就執行run函式,而run函式中,使用蜘蛛爬去和解析了頁面
蜘蛛是傳進來的,所以定義一個蜘蛛類, 蜘蛛的行為,是在類中定義好的, 該蜘蛛的某個行為幹某件事情
直接利用蜘蛛點一下就可以了, 不得不讚嘆爬蟲太偉大了, 教我們一個人就幹一件事情,一個人就做好
自己眼前的事情
執行緒類的區別是, 你呼叫一次等於啟一個執行緒,大家是同時執行的 只不過共用計算機記憶體, 公用資源而已
而普通類,你呼叫一次一次,他們是排隊等候的,當上一個程式執行完成,才會執行下一個
執行緒太偉大了
多程序 運作方式,一樣, 只不過是不共用資源,啟一個自動複製一個資源,佔用一份記憶體
但是效率更高
一個類定義的就是一個物件,一個方法,一個物件的屬性,一個物件的行為
"""
def __init__(self, spider, task_queue):
"""
只是引數而已, 傳入時是靈活的
daemon=True 是把執行緒設為守護執行緒, 當主執行緒掛了,守護執行緒也掛
:param spider: 傳入爬蟲函式
:param task_queue: 任務佇列 給爬蟲函式的引數
"""
super().__init__(daemon=True)
self.spider = spider
self.task_queue = task_queue
def run(self):
"""
這是個回撥函式 前面有 'O+向上的箭頭' 代表實回撥函式 Thread是有這個函式的 是方法的重寫
當利用這個類建立物件時,會自動呼叫該函式
:return:
"""
# 這是一個死迴圈, 啟動一個執行緒 時 執行緒 不死 一直執行 所以應該可以設定一個執行緒存活的時間
while True:
# 從佇列中取出url
current_url = self.task_queue.get()
# 把url新增到訪問過的url的集合中 把他放進訪問過的url集合的
# 原因是 放置其他程序爬取, 10個程序是公用佇列的, 不過有
# 上面的判斷,爬取過了,就不會再爬了
visited_urls.add(current_url)
# 標記爬蟲為工作狀態
self.spider.status = SpiderStatus.WORKING
# 爬蟲 爬取頁面 這個spider是上面傳入的spider,這只是個引數, 當真正的爬蟲傳進來時所有的spider引數都會自動換成真正的引數
# 相當與數學中的 引數 X X可以代表任意數
html_page = self.spider.fetch(current_url)
# 如果頁面存在 不是沒有 或者 獲取到的頁面是一個空字串
if html_page not in [None, '']:
# 呼叫爬蟲下的 解析函式 解析頁面 解析函式解析出來的是url_links
url_links = self.spider.parse(html_page)
# 把url放到佇列中
for url_link in url_links:
self.task_queue.put(url_link)
# 爬完一次之後把爬蟲狀態 標記為空閒狀態
self.spider.status = SpiderStatus.IDEL
def is_any_alive(spider_threads):
"""
判斷程序是否活著
:param spider_threads: 程序
:return:有一個為 True 返回True 全都死掉返回false
"""
return any([spider_thread.spider.status == SpiderStatus.WORKING\
for spider_thread in spider_threads])
def main():
"""
主程序,當它停止執行時, 守護執行緒就卵了
主程序裡主要就是定義類的引數
:return:
"""
# Queue佇列的意思, 可以往裡面存取東西 定義任務佇列
task_queue = Queue()
# 存東西用put 拿東西用get 先放入sohu的url
task_queue.put('http://m.sohu.com/')
# 呼叫執行緒物件傳入引數,傳入爬蟲物件 和 任務佇列兩個引數
# spider()這是個爬蟲物件 建立執行緒物件 會自動呼叫回撥函式run
spider_threads = [SpiderThread(Spider(task_queue), task_queue) for _ in range(10)]
# 取出每一個執行緒 啟動執行緒 因為這是一個執行緒類 要是普通類就不需要這一步了
for spider_thread in spider_threads:
spider_thread.start()
# 為了不讓程式執行完了之後主執行緒結束 要不主執行緒結束 守護執行緒就掛了 當沒有任務了 為false時 while迴圈停止 任務隨著爬蟲的執行源源不斷的加入佇列之中
# 當 沒有時 意味著爬完了 或者當執行緒全死了 也停止迴圈 其他pass 意思是什麼也不執行過,那就進行第二次迴圈
# 這個迴圈其實就是為了保證 守護執行緒不死 寫的不好, 不過暫時先這麼用
# 什麼時候結束了 程式往下執行輸入 'Over'
while not task_queue or is_any_alive(spider_threads):
pass
print('Over!')
if __name__ == '__main__':
main()