實習了一個多月!師傅終於教我案例了!分散式爬蟲!這是我的筆記
要抓微博資料,第一步便是模擬登陸,因為很多資訊(比如使用者資訊,使用者主頁微博資料翻頁等各種翻頁)都需要在登入狀態下才能檢視
這裡我簡單說一下,做爬蟲的同學不要老想著用什麼機器學習的方法去識別複雜驗證碼,真的難度非常大,這應該也不是一個爬蟲工程師的工作重點,當然這只是我的個人建議。工程化的專案,我還是建議大家通過打碼平臺來解決驗證碼的問題。
策略我們都清楚了。就該是分析和編碼了。
我們先來分析如何構造使用者資訊的URL。這裡我以微博名為一起神吐槽的博主為例進行分析。做爬蟲的話,一個很重要的意識就是爬蟲能抓的資料都是人能看到的資料,反過來,人能在瀏覽器上看到的資料,爬蟲幾乎都能抓。這裡用的是幾乎,因為有的資料抓取難度特別。我們首先需要以正常人的流程看看怎麼獲取到使用者的資訊。我們先進入該博主的主頁,如下圖
進群:548377875 即可獲取大量的PDF以及教學視訊哦!希望你能通過Python拿到高薪!
種子博主具體資訊
這裡我們就看到了他的具體資訊了。然後,我們看該頁面的url構造
weibo.com/p/100505175…
我直接copy的位址列的url。這樣做有啥不好的呢?對於老鳥來說,一下就看出來了,這樣做的話,可能會導致資訊不全,因為可能有些資訊是動態載入的。所以,我們需要通過抓包來判斷到底微博會通過該url返回所有資訊,還是需要請求一些ajax 連結才會返回一些關鍵資訊。這裡我就重複一下我的觀點:抓包很重要,抓包很重要,抓包很重要!重要的事情說三遍。
我們抓完包,發現並沒有ajax請求。那麼可以肯定請求前面的url,會返回所有資訊。我們通過點選滑鼠右鍵,檢視網頁原始碼,然後ctrl+a、ctrl+c將所有的頁面原始碼儲存到本地,這裡我命名為personinfo.html。我們用瀏覽器開啟該檔案,發現我們需要的所有資訊都在這段原始碼中,這個工作和抓包判斷資料是否全面有些重複,但是在我看來是必不可少的,因為我們解析頁面資料的時候還可以用到這個html檔案,如果我們每次都通過網路請求去解析內容的話,那麼可能賬號沒一會兒就會被封了(因為頻繁訪問微博資訊),所以我們需要把要解析的檔案儲存到本地。
從上面分析中我們可以得知
weibo.com/p/100505175…
這個url就是獲取使用者資料的url。那麼我們在只知道使用者id的時候怎麼構造它呢?我們可以多拿幾個使用者id來做測試,看構造是否有規律,比如我這裡以使用者名稱為網易雲音樂的使用者做分析,發現它的使用者資訊頁面構造如下
weibo.com/1721030997/…
這個就和上面那個不同了。但是我們仔細觀察,可以發現上面那個是個人使用者,下面是企業微博使用者。我們嘗試一下把它們url格式都統一為第一種或者第二種的格式
weibo.com/1751195602/…
這樣會出現404,那麼統一成上面那種呢?
weibo.com/p/100505172…
這樣子的話,它會被重定向到使用者主頁,而不是使用者詳細資料頁。所以也就不對了。那麼該以什麼依據判斷何時用第一種url格式,何時用第二種url格式呢?我們多翻幾個使用者,會發現除了100505之外,還有100305、100206等字首,那麼我猜想這個應該可以區分不同使用者。這個字首在哪裡可以得到呢?我們開啟我們剛儲存的頁面原始碼,搜尋100505,可以發現
domain
微博應該是根據這個來區分不同使用者型別的。這裡大家可以自己也可以試試,看不同使用者的domain是否不同。為了資料能全面,我也是做了大量測試,發現個人使用者的domain是1005051,作家是100305,其他基本都是認證的企業號。前兩個個人資訊的url構造就是
weibo.com/p/domain+ui…
後者的是
weibo.com/uid/about
弄清楚了個人資訊url的構造方式,但是還有一個問題。我們已知只有uid啊,沒有domain啊。如果是企業號,我們通過domain=100505會被重定向到主頁,如果是作家等(domain=100305或者100306),也會被重定向主頁。我們在主頁把domain提取出來,再請求一次,不就能拿到使用者詳細資訊了嗎?
關於如何構造獲取使用者資訊的url的相關分析就到這裡了。因為我們是在登入的情況下進行資料抓取的,可能在抓取的時候,某個賬號突然就被封了,或者由於網路原因,某次請求失敗了,該如何處理?對於前者,我們需要判斷每次請求返回的內容是否符合預期,也就是看response url是否正常,看response content是否是404或者讓你驗證手機號等,對於後者,我們可以做一個簡單的重試策略,大概程式碼如下
@timeout_decorator
def get_page(url, user_verify=True, need_login=True):
"""
:param url: 待抓取url
:param user_verify: 是否為可能出現驗證碼的頁面(ajax連線不會出現驗證碼,如果是請求微博或者使用者資訊可能出現驗證碼),否為抓取轉發的ajax連線
:param need_login: 抓取頁面是否需要登入,這樣做可以減小一些賬號的壓力
:return: 返回請求的資料,如果出現404或者403,或者是別的異常,都返回空字串
"""
crawler.info('本次抓取的url為{url}'.format(url=url))
count = 0
while count < max_retries: if need_login: # 每次重試的時候都換cookies,並且和上次不同,如果只有一個賬號,那麼就允許相同 name_cookies = Cookies.fetch_cookies() if name_cookies is None: crawler.warning('cookie池中不存在cookie,正在檢查是否有可用賬號') rs = get_login_info() # 選擇狀態正常的賬號進行登入,賬號都不可用就停掉celery worker if len(rs) == 0: crawler.error('賬號均不可用,請檢查賬號健康狀況') # 殺死所有關於celery的程序 if 'win32' in sys.platform: os.popen('taskkill /F /IM "celery*"') else: os.popen('pkill -f "celery"') else: crawler.info('重新獲取cookie中...') login.excute_login_task() time.sleep(10) try: if need_login: resp = requests.get(url, headers=headers, cookies=name_cookies[1], timeout=time_out, verify=False) if "$CONFIG['islogin'] = '0'" in resp.text: crawler.warning('賬號{}出現異常'.format(name_cookies[0])) freeze_account(name_cookies[0], 0) Cookies.delete_cookies(name_cookies[0]) continue else: resp = requests.get(url, headers=headers, timeout=time_out, verify=False) page = resp.text if page: page = page.encode('utf-8', 'ignore').decode('utf-8') else: continue # 每次抓取過後程式sleep的時間,降低封號危險 time.sleep(interal) if user_verify: if 'unfreeze' in resp.url or 'accessdeny' in resp.url or 'userblock' in resp.url or is_403(page): crawler.warning('賬號{}已經被凍結'.format(name_cookies[0])) freeze_account(name_cookies[0], 0) Cookies.delete_cookies(name_cookies[0]) count += 1 continue if 'verifybmobile' in resp.url: crawler.warning('賬號{}功能被鎖定,需要手機解鎖'.format(name_cookies[0])) freeze_account(name_cookies[0], -1) Cookies.delete_cookies(name_cookies[0]) continue if not is_complete(page): count += 1 continue if is_404(page): crawler.warning('url為{url}的連線不存在'.format(url=url)) return '' except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError, AttributeError) as e: crawler.warning('抓取{}出現異常,具體資訊是{}'.format(url, e)) count += 1 time.sleep(excp_interal) else: Urls.store_crawl_url(url, 1) return page crawler.warning('抓取{}已達到最大重試次數,請在redis的失敗佇列中檢視該url並檢查原因'.format(url)) Urls.store_crawl_url(url, 0) return ''
這裡大家把上述程式碼當一段虛擬碼讀就行了,主要看看如何處理抓取時候的異常。因為如果貼整個使用者抓取的程式碼,不是很現實,程式碼量有點大。
下面講頁面解析的分析。有一些做PC端微博資訊抓取的同學,可能曾經遇到過這麼個問題:儲存到本地的html檔案開啟都能看到所有資訊啊,為啥在頁面原始碼中找不到呢?因為PC端微博頁面的關鍵資訊都是像下圖這樣,被FM.view()包裹起來的,裡面的資料可能被json encode過。
標籤
那麼這麼多的FM.view(),我們怎麼知道該提取哪個呢?這裡有一個小技巧,由於只有中文會被編碼,英文還是原來的樣子,所以我們可以看哪段script中包含了渲染後的頁面中的字元,那麼那段應該就可能包含所有頁面資訊。我們這裡以頂部的頭像為例,如圖
我們在頁面原始碼中搜索,只發現一個script中有該字串,那麼就是那段script是頁面相關資訊。我們可以通過正則表示式把該script提取出來,然後把其中的html也提取出來,再儲存到本地,看看資訊是否全面。這裡我就不截圖了。感覺還有很多要寫的,不然篇幅太長了。
另外,對於具體頁面的解析,我也不做太多的介紹了。太細的東西還是建議讀讀原始碼。我只講一下,我覺得的一種處理異常的比較優雅的方式。微博爬蟲的話,主要是頁面樣式太多,如果你打算包含所有不同的使用者的模版,那麼我覺得幾乎不可能,不同使用者模版,用到的解析規則就不一樣。那麼出現解析異常如何處理?尤其是你沒有catch到的異常。很可能因為這個問題,程式就崩掉。其實對於Python這門語言來說,我們可以通過 裝飾器 來捕捉我們沒有考慮到的異常,比如我這個裝飾器
def parse_decorator(return_type):
"""
:param return_type: 用於捕捉頁面解析的異常, 0表示返回數字0, 1表示返回空字串, 2表示返回[],3表示返回False, 4表示返回{}, 5返回None
:return: 0,'',[],False,{},None
"""
def page_parse(func): br/>@wraps(func)
def handle_error( keys):
try:
keys)
except Exception as e:
parser.error(e)
if return_type == 5: return None elif return_type == 4: return {} elif return_type == 3: return False elif return_type == 2: return [] elif return_type == 1: return '' else: return 0 return handle_error return page_parse
上面的程式碼就是處理解析頁面發生異常的情況,我們只能在資料的準確性、全面性和程式的健壯性之間做一些取捨。用裝飾器的話,程式中不用寫太多的 try語句,程式碼重複率也會減少很多。
頁面的解析由於篇幅所限,我就講到這裡了。沒有涉及太具體的解析,其中一個還有一個比較難的點,就是資料的全面性,讀者可以去多觀察幾個微博使用者的個人資訊,就會發現有的個人資訊,有的使用者有填寫,有的並沒有。解析的時候要考慮完的話,建議從自己的微博的個人資訊入手,看到底有哪些可以填。這樣可以保證幾乎不會漏掉一些重要的資訊。
最後,我再切合本文的標題,講如何搭建一個分散式的微博爬蟲。開發過程中,我們可以先就做單機單執行緒的爬蟲,然後再改成使用celery的方式。這裡這樣做是為了方便開發和測試,因為你單機搭起來並且跑得通了,那麼分散式的話,就很容易改了,因為celery的API使用本來就很簡潔。
我們抓取的是使用者資訊和他的關注和粉絲uid。使用者資訊的話,我們一個請求大概能抓取一個使用者的資訊,而粉絲和關注我們一個請求可以抓取18個左右(因為這個抓的是列表),顯然可以發現使用者資訊應該多佔一些請求的資源。這時候就該介紹理論篇沒有介紹的關於celery的一個高階特性了,它叫做任務路由。直白點說,它可以規定哪個分散式節點能做哪些任務,不能做哪些任務。它的存在可以讓資源分配更加合理, 分散式微博爬蟲專案初期,就沒有使用任務路由,然後抓了十多萬條關注和分析,結果發現使用者資訊抓幾萬條,這就是資源分配得不合理。那麼如何進行任務路由呢?
coding:utf-8
import os
from datetime import timedelta
from celery import Celery
from kombu import Exchange, Queue
from config.conf import get_broker_or_backend
from celery import platforms
允許celery以root身份啟動
platforms.C_FORCE_ROOT = True
worker_log_path = os.path.join(os.path.dirname(os.path.dirname( file ))+'/logs', 'celery.log')
beat_log_path = os.path.join(os.path.dirname(os.path.dirname( file ))+'/logs', 'beat.log')
tasks = ['tasks.login', 'tasks.user']
include的作用就是註冊服務化函式
app = Celery('weibo_task', include=tasks, broker=get_broker_or_backend(1), backend=get_broker_or_backend(2))
app.conf.update(
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERYD_LOG_FILE=worker_log_path,
CELERYBEAT_LOG_FILE=beat_log_path,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES=(
Queue('login_queue', exchange=Exchange('login', type='direct'), routing_key='for_login'),
Queue('user_crawler', exchange=Exchange('user_info', type='direct'), routing_key='for_user_info'),
Queue('fans_followers', exchange=Exchange('fans_followers', type='direct'), routing_key='for_fans_followers'),
)
上述程式碼我指定了有login_queue、user_crawler、fans_followers三個任務佇列。它們分別的作用是登入、使用者資訊抓取、粉絲和關注抓取。現在假設我有三臺爬蟲伺服器A、B和C。我想讓我所有的賬號登入任務分散到三臺伺服器、讓使用者抓取在A和B上執行,讓粉絲和關注抓取在C上執行,那麼啟動A、B、C三個伺服器的celery worker的命令就分別是
celery -A tasks.workers -Q login_queue,user_crawler worker -l info -c 1 # A伺服器和B伺服器啟動worker的命令,它們只會執行登入和使用者資訊抓取任務
celery -A tasks.workers -Q login_queue,fans_followers worker -l info -c 1 # C伺服器啟動worker的命令,它只會執行登入、粉絲和關注抓取任務
然後我們通過命令列或者程式碼(如下)就能傳送所有任務給各個節點執行了
coding:utf-8
from tasks.workers import app
from page_get import user as user_get
from db.seed_ids import get_seed_ids, get_seed_by_id, insert_seeds, set_seed_other_crawled
@app.task(ignore_result=True)
def crawl_follower_fans(uid):
seed = get_seed_by_id(uid)
if seed.other_crawled == 0:
rs = user_get.get_fans_or_followers_ids(uid, 1)
rs.extend(user_get.get_fans_or_followers_ids(uid, 2))
datas = set(rs)
重複資料跳過插入
if datas: insert_seeds(datas) set_seed_other_crawled(uid)
@app.task(ignore_result=True)
def crawl_person_infos(uid):
"""
根據使用者id來爬取使用者相關資料和使用者的關注數和粉絲數(由於微博服務端限制,預設爬取前五頁,企業號的關注和粉絲也不能檢視)
:param uid: 使用者id
:return:
"""
if not uid:
return
# 由於與別的任務共享資料表,所以需要先判斷資料庫是否有該使用者資訊,再進行抓取 user = user_get.get_profile(uid) # 不抓取企業號 if user.verify_type == 2: set_seed_other_crawled(uid) return app.send_task('tasks.user.crawl_follower_fans', args=(uid,), queue='fans_followers', routing_key='for_fans_followers')
@app.task(ignore_result=True)
def excute_user_task():
seeds = get_seed_ids()
if seeds:
for seed in seeds:
在send_task的時候指定任務佇列
app.send_task('tasks.user.crawl_person_infos', args=(seed.uid,), queue='user_crawler', routing_key='for_user_info')
這裡我們是通過 queue='user_crawler',routing_key='for_user_info'來將任務和worker進行關聯的。
關於celery任務路由的更詳細的資料請閱讀官方文件。
到這裡,基本把微博資訊抓取的過程和分散式進行抓取的過程都講完了,具體實現分散式的方法,可以讀讀基礎篇。由於程式碼量比較大,我並沒有貼上完整的程式碼,只講了要點。分析過程是講的抓取過程的分析和頁面解析的分析,並在最後,結合分散式,講了一下使用任務佇列來讓分散式爬蟲更加靈活和可擴充套件。
如果有同學想跟著做一遍,可能需要參考分散式微博爬蟲的原始碼,自己動手實現一下,或者跑一下,印象可能會更加深刻。