1. 程式人生 > 其它 >小白爬蟲第四彈之爬蟲快跑(多程序 + 多執行緒)

小白爬蟲第四彈之爬蟲快跑(多程序 + 多執行緒)

PS:使用多執行緒時好像在目錄切換的問題上存在問題,可以給執行緒加個鎖試試 Hello 大家好!我又來了。你是不是發現下載圖片速度特別慢、難以忍受啊!對於這種問題 一般解決辦法就是多程序了!一個程序速度慢!我就用十個程序,相當於十個人一起幹。速度就會快很多啦!(為什麼不說多執行緒?懂點 Python 的小夥伴都知道、GIL 的存在 導致 Python 的多執行緒有點坑啊!)今天就教大家來做一個多程序的爬蟲(其實吧、可以用來做一個超簡化版的分散式爬蟲) 其實吧!還有一種加速的方法叫做 “非同步”!不過這玩意兒我沒怎麼整明白就不出來誤人子弟了!(因為爬蟲大部分時間都是在等待 response 中!‘非同步’則能讓程式在等待 response 的時間去做的其他事情。)

學過 Python 基礎的同學都知道、在多程序中,程序之間是不能相互通訊的,這就有一個很坑爹的問題的出現了!多個程序怎麼知道那那些需要爬取、哪些已經被爬取了! 這就涉及到一個東西!這玩意兒叫做佇列!!佇列!!佇列!!其實吧正常來說應該給大家用佇列來完成這個教程的, 比如 Tornado 的 queue 模組。(如果需要更為穩定健壯的佇列,則請考慮使用 Celery 這一類的專用訊息傳遞工具) 不過為了簡化技術種類啊!(才不會告訴你們是我懶,嫌麻煩呢!)這次我們繼續使用 MongoDB。 好了!先來理一下思路: 每個程序需要知道那些 URL 爬取過了、哪些 URL 需要爬取!我們來給每個 URL 設定兩種狀態: outstanding: 等待爬取的 URL complete: 爬取完成的 URL 誒!等等我們好像忘了啥? 失敗的 URL 的怎麼辦啊?我們在增加一種狀態: processing: 正在進行的 URL。 嗯!當一個所有初始的 URL 狀態都為 outstanding;當開始爬取的時候狀態改為:processing;爬取完成狀態改為:complete;失敗的 URL 重置狀態為:outstanding。為了能夠處理 URL 程序被終止的情況、我們設定一個計時引數,當超過這個值時;我們則將狀態重置為 outstanding。 下面開整 Go Go Go! 首先我們需要一個模組:datetime (這個模組比內建 time 模組要好使一點) 不會裝??不是吧! pip install datetime 還有上一篇博文我們已經使用過的 pymongo 下面是佇列的程式碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from datetime import datetime, timedelta
from pymongo import MongoClient, errors

class MogoQueue():

OUTSTANDING = 1 ##初始狀態
PROCESSING = 2 ##正在下載狀態
COMPLETE = 3 ##下載完成狀態

def __init__(self, db, collection, timeout=300):##初始mongodb連線
self.client = MongoClient()
self.Client = self.client[db]
self.db = self.Client[collection]
self.timeout = timeout

def __bool__(self):
"""
這個函式,我的理解是如果下面的表達為真,則整個類為真
至於有什麼用,後面我會註明的(如果我的理解有誤,請指點出來謝謝,我也是Python新手)
$ne的意思是不匹配
"""
record = self.db.find_one(
{'status': {'$ne': self.COMPLETE}}
)
return True if record else False

def push(self, url, title): ##這個函式用來新增新的URL進佇列
try:
self.db.insert({'_id': url, 'status': self.OUTSTANDING, '主題': title})
print(url, '插入佇列成功')
except errors.DuplicateKeyError as e: ##報錯則代表已經存在於佇列之中了
print(url, '已經存在於佇列中了')
pass
def push_imgurl(self, title, url):
try:
self.db.insert({'_id': title, 'statue': self.OUTSTANDING, 'url': url})
print('圖片地址插入成功')
except errors.DuplicateKeyError as e:
print('地址已經存在了')
pass

def pop(self):
"""
這個函式會查詢佇列中的所有狀態為OUTSTANDING的值,
更改狀態,(query後面是查詢)(update後面是更新)
並返回_id(就是我們的URL),MongDB好使吧,^_^
如果沒有OUTSTANDING的值則呼叫repair()函式重置所有超時的狀態為OUTSTANDING,
$set是設定的意思,和MySQL的set語法一個意思
"""
record = self.db.find_and_modify(
query={'status': self.OUTSTANDING},
update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
)
if record:
return record['_id']
else:
self.repair()
raise KeyError

def pop_title(self, url):
record = self.db.find_one({'_id': url})
return record['主題']

def peek(self):
"""這個函式是取出狀態為 OUTSTANDING的文件並返回_id(URL)"""
record = self.db.find_one({'status': self.OUTSTANDING})
if record:
return record['_id']

def complete(self, url):
"""這個函式是更新已完成的URL完成"""
self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}})

def repair(self):
"""這個函式是重置狀態$lt是比較"""
record = self.db.find_and_modify(
query={
'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
'status': {'$ne': self.COMPLETE}
},
update={'$set': {'status': self.OUTSTANDING}}
)
if record:
print('重置URL狀態', record['_id'])

def clear(self):
"""這個函式只有第一次才呼叫、後續不要呼叫、因為這是刪庫啊!"""
self.db.drop()

好了,佇列我們做好了,下面是獲取所有頁面的程式碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from Download import request
from mongodb_queue import MogoQueue
from bs4 import BeautifulSoup


spider_queue = MogoQueue('meinvxiezhenji', 'crawl_queue')
def start(url):
response = request.get(url, 3)
Soup = BeautifulSoup(response.text, 'lxml')
all_a = Soup.find('div', class_='all').find_all('a')
for a in all_a:
title = a.get_text()
url = a['href']
spider_queue.push(url, title)
"""上面這個呼叫就是把URL寫入MongoDB的隊列了"""

if __name__ == "__main__":
start('http://www.mzitu.com/all')

"""這一段兒就不解釋了哦!超級簡單的"""

下面就是多程序 + 多執行緒的下載程式碼了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import os
import time
import threading
import multiprocessing
from mongodb_queue import MogoQueue
from Download import request
from bs4 import BeautifulSoup

SLEEP_TIME = 1

def mzitu_crawler(max_threads=10):
crawl_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') ##這個是我們獲取URL的佇列
##img_queue = MogoQueue('meinvxiezhenji', 'img_queue')
def pageurl_crawler():
while True:
try:
url = crawl_queue.pop()
print(url)
except KeyError:
print('佇列沒有資料')
break
else:
img_urls = []
req = request.get(url, 3).text
title = crawl_queue.pop_title(url)
mkdir(title)
os.chdir('D:\mzitu\\' + title)
max_span = BeautifulSoup(req, 'lxml').find('div', class_='pagenavi').find_all('span')[-2].get_text()
for page in range(1, int(max_span) + 1):
page_url = url + '/' + str(page)
img_url = BeautifulSoup(request.get(page_url, 3).text, 'lxml').find('div', class_='main-image').find('img')['src']
img_urls.append(img_url)
save(img_url)
crawl_queue.complete(url) ##設定為完成狀態
##img_queue.push_imgurl(title, img_urls)
##print('插入資料庫成功')

def save(img_url):
name = img_url[-9:-4]
print(u'開始儲存:', img_url)
img = request.get(img_url, 3)
f = open(name + '.jpg', 'ab')
f.write(img.content)
f.close()

def mkdir(path):
path = path.strip()
isExists = os.path.exists(os.path.join("D:\mzitu", path))
if not isExists:
print(u'建了一個名字叫做', path, u'的資料夾!')
os.makedirs(os.path.join("D:\mzitu", path))
return True
else:
print(u'名字叫做', path, u'的資料夾已經存在了!')
return False

threads = []
while threads or crawl_queue:
"""
這兒crawl_queue用上了,就是我們__bool__函式的作用,為真則代表我們MongoDB佇列裡面還有資料
threads 或者 crawl_queue為真都代表我們還沒下載完成,程式就會繼續執行
"""
for thread in threads:
if not thread.is_alive(): ##is_alive是判斷是否為空,不是空則在佇列中刪掉
threads.remove(thread)
while len(threads) < max_threads or crawl_queue.peek(): ##執行緒池中的執行緒少於max_threads 或者 crawl_qeue時
thread = threading.Thread(target=pageurl_crawler) ##建立執行緒
thread.setDaemon(True) ##設定守護執行緒
thread.start() ##啟動執行緒
threads.append(thread) ##新增進執行緒佇列
time.sleep(SLEEP_TIME)

def process_crawler():
process = []
num_cpus = multiprocessing.cpu_count()
print('將會啟動程序數為:', num_cpus)
for i in range(num_cpus):
p = multiprocessing.Process(target=mzitu_crawler) ##建立程序
p.start() ##啟動程序
process.append(p) ##新增進程序佇列
for p in process:
p.join() ##等待程序佇列裡面的程序結束

if __name__ == "__main__":
process_crawler()

好啦!一個多程序多線的爬蟲就完成了,(其實你可以設定一下 MongoDB,然後調整一下連線配置,在多臺機器上跑哦!!嗯,就是超級簡化版的分散式爬蟲了,雖然很是簡陋。) 本來還想下載圖片那一塊兒加上非同步(畢竟下載圖片是I\O等待最久的時間了,),可惜非同步我也沒怎麼整明白,就不拿出來貽笑大方了。 另外,各位小哥兒可以參考上面程式碼,單獨處理圖片地址試試(就是多個程序直接下載圖片)? 我測試了一下八分鐘下載 100 套圖PS:請務必使用 第二篇博文中的下載模組,或者自己寫一個自動更換代理的下載模組!!!不然寸步難行,分分鐘被伺服器 BAN 掉!小白教程就到此結束了,後面我教大家玩玩 Scrapy;目標 頂點小說網, 爬完全站的小說。 再後面帶大家玩玩 抓新浪 湯不熱、模擬登入 之類的。或許維護一個公共代理 IP 池之類的。 這個所有程式碼我放在這個位置了:https://github.com/thsheep/mzitu/