1. 程式人生 > >Python分散式爬蟲打造搜尋引擎完整版-基於Scrapy、Redis、elasticsearch和django打造一個完整的搜尋引擎網站

Python分散式爬蟲打造搜尋引擎完整版-基於Scrapy、Redis、elasticsearch和django打造一個完整的搜尋引擎網站

Python分散式爬蟲打造搜尋引擎

基於Scrapy、Redis、elasticsearch和django打造一個完整的搜尋引擎網站https://github.com/mtianyan/ArticleSpider 未來是什麼時代?是資料時代!資料分析服務、網際網路金融,資料建模、自然語言處理、醫療病例分析……越來越多的工作會基於資料來做,而爬蟲正是快速獲取資料最重要的方式,相比其它語言,Python爬蟲更簡單、高效

一、基礎知識學習:

1. 爬取策略的深度優先和廣度優先

目錄:

  1. 網站的樹結構
  2. 深度優先演算法和實現
  3. 廣度優先演算法和實現

網站url樹結構分層設計:

  • bogbole.com
    • blog.bogbole.com
    • python.bogbole.com
      • python.bogbole.com/123

環路連結問題:

從首頁到下面節點。 但是下面的連結節點又會有連結指向首頁

所以:我們需要對於連結進行去重

1. 深度優先2. 廣度優先

跳過已爬取的連結 對於二叉樹的遍歷問題

深度優先(遞迴實現): 順著一條路,走到最深處。然後回頭

廣度優先(佇列實現): 分層遍歷:遍歷完兒子輩。然後遍歷孫子輩

Python實現深度優先過程code:

def depth_tree(tree_node):
    if tree_node is not None:
        print (tree_node._data)
        if tree_node._left is not None:
            return depth_tree(tree_node.left)
        if tree_node._right is not None:
            return depth_tree(tree_node,_right)

Python實現廣度優先過程code:

def level_queue(root):
    #利用佇列實現樹的廣度優先遍歷
    if root is None:
        return
    my_queue = []
    node = root
    my_queue.append(node)
    while my_queue:
        node = my_queue.pop(0)
        print (node.elem)
        if node.lchild is not None:
            my_queue.append(node.lchild)
        if node.rchild is not None:
            my_queue.append(node.rchild)

2. 爬蟲網址去重策略

  1. 將訪問過的url儲存到資料庫中
  2. 將url儲存到set中。只需要O(1)的代價就可以查詢到url

    100000000*2byte*50個字元/1024/1024/1024 = 9G

  3. url經過md5等方法雜湊後儲存到set中,將url壓縮到固定長度而且不重複
  4. 用bitmap方法,將訪問過的url通過hash函式對映到某一位
  5. bloomfilter方法對bitmap進行改進,多重hash函式降低衝突

scrapy去重使用的是第三種方法:後面分散式scrapy-redis會講解bloomfilter方法。

3. Python字串編碼問題解決:

  1. 計算機只能處理數字,文字轉換為數字才能處理,計算機中8個bit作為一個位元組, 所以一個位元組能表示的最大數字就是255
  2. 計算機是美國人發明的,所以一個位元組就可以標識所有單個字元 ,所以ASCII(一個位元組)編碼就成為美國人的標準編碼
  3. 但是ASCII處理中文明顯不夠,中文不止255個漢字,所以中國製定了GB2312編碼 ,用兩個位元組表示一個漢字。GB2312將ASCII也包含進去了。同理,日文,韓文,越來越多的國家為了解決這個問題就都發展了一套編碼,標準越來越多,如果出現多種語言混合顯示就一定會出現亂碼
  4. 於是unicode出現了,它將所有語言包含進去了。
  5. 看一下ASCII和unicode編碼:
    1. 字母A用ASCII編碼十進位制是65,二進位制 0100 0001
    2. 漢字”中” 已近超出ASCII編碼的範圍,用unicode編碼是20013二進位制是01001110 00101101
    3. A用unicode編碼只需要前面補0二進位制是 00000000 0100 0001
  6. 亂碼問題解決的,但是如果內容全是英文,unicode編碼比ASCII編碼需要多一倍的儲存空間,傳輸也會變慢。
  7. 所以此時出現了可變長的編碼”utf-8” ,把英文:1位元組,漢字3位元組,特別生僻的變成4-6位元組,如果傳輸大量的英文,utf8作用就很明顯。

**讀取檔案,進行操作時轉換為unicode編碼進行處理** **儲存檔案時,轉換為utf-8編碼。以便於傳輸** 讀檔案的庫會將轉換為unicode *python2 預設編碼格式為`ASCII`,Python3 預設編碼為 `utf-8`*

#python3
import sys
sys.getdefaultencoding()
s.encoding('utf-8')
#python2
import sys
sys.getdefaultencoding()
s = "我和你"
su = u"我和你"
~~s.encode("utf-8")#會報錯~~
s.decode("gb2312").encode("utf-8")
su.encode("utf-8")

二、伯樂線上爬取所有文章

1. 初始化檔案目錄

基礎環境

  1. python 3.5.1
  2. JetBrains PyCharm 2016.3.2
  3. mysql+navicat

為了便於日後的部署:我們開發使用了虛擬環境。

pip install virtualenv
pip install virtualenvwrapper-win
安裝虛擬環境管理
mkvirtualenv articlespider3
建立虛擬環境
workon articlespider3
直接進入虛擬環境
deactivate
退出啟用狀態
workon
知道有哪些虛擬環境

scrapy專案初始化介紹

自行官網下載py35對應得whl檔案進行pip離線安裝 Scrapy 1.3.3

**命令列建立scrapy專案**

cd desktop

scrapy startproject ArticleSpider

**scrapy目錄結構** scrapy借鑑了django的專案思想

  • scrapy.cfg:配置檔案。
  • setings.py:設定
SPIDER_MODULES = ['ArticleSpider.spiders'] #存放spider的路徑
NEWSPIDER_MODULE = 'ArticleSpider.spiders'

pipelines.py:

做跟資料儲存相關的東西

middilewares.py:

自己定義的middlewares 定義方法,處理響應的IO操作

__init__.py:

專案的初始化檔案。

items.py:

定義我們所要爬取的資訊的相關屬性。Item物件是種類似於表單,用來儲存獲取到的資料

**建立我們的spider**

cd ArticleSpider
scrapy genspider jobbole blog.jobbole.com

可以看到直接為我們建立好的空專案裡已經有了模板程式碼。如下:

# -*- coding: utf-8 -*-
import scrapy


class JobboleSpider(scrapy.Spider):
    name = "jobbole"
    allowed_domains = ["blog.jobbole.com"]
    # start_urls是一個帶爬的列表,
    #spider會為我們把請求下載網頁做到,直接到parse階段
    start_urls = ['http://blog.jobbole.com/']
    def parse(self, response):
        pass

scray在命令列啟動某一個Spyder的命令:

scrapy crawl jobbole

**在windows報出錯誤** `ImportError: No module named ‘win32api’`

pip install pypiwin32#解決

**建立我們的除錯工具類*** 在專案根目錄裡建立main.py 作為除錯工具檔案

# _*_ coding: utf-8 _*_
__author__ = 'mtianyan'
__date__ = '2017/3/28 12:06'

from scrapy.cmdline import execute

import sys
import os

#將系統當前目錄設定為專案根目錄
#os.path.abspath(__file__)為當前檔案所在絕對路徑
#os.path.dirname為檔案所在目錄
#H:\CodePath\spider\ArticleSpider\main.py
#H:\CodePath\spider\ArticleSpider
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
#執行命令,相當於在控制檯cmd輸入改名了
execute(["scrapy", "crawl" , "jobbole"])

**settings.py的設定不遵守reboots協議** `ROBOTSTXT_OBEY = False` 在jobble.py打上斷點:

def parse(self, response):
    pass

可以看到他返回的htmlresponse物件: 物件內部:

  • body:網頁內容
  • _DEFAULT_ENCODING= ‘ascii’
  • encoding= ‘utf-8’

可以看出scrapy已經為我們做到了將網頁下載下來。而且編碼也進行了轉換.

2. 提取伯樂線上內容

xpath的使用

xpath讓你可以不懂前端html,不看html的詳細結構,只需要會右鍵檢視就能獲取網頁上任何內容。速度遠超beautifulsoup。 目錄:

1. xpath簡介
2. xpath術語與語法
3. xpath抓取誤區:javasrcipt生成html與html原始檔的區別
4. xpath抓取例項

為什麼要使用xpath?

  • xpath使用路徑表示式在xml和html中進行導航
  • xpath包含有一個標準函式庫
  • xpath是一個w3c的標準
  • xpath速度要遠遠超beautifulsoup。

**xpath節點關係**

  1. 父節點*上一層節點*
  2. 子節點
  3. 兄弟節點*同胞節點*
  4. 先輩節點*父節點,爺爺節點*
  5. 後代節點*兒子,孫子* xpath語法:
表示式 說明
article 選取所有article元素的所有子節點
/article 選取根元素article
article/a 選取所有屬於article的子元素的a元素
//div 選取所有div元素(不管出現在文件裡的任何地方)
article//div 選取所有屬於article元素的後代的div元素,不管它出現在article之下的任何位置
//@class 選取所有名為class的屬性

xpath語法-謂語:

表示式 說明
/article/div[1 選取屬於article子元素的第一個div元素
/article/div[last()] 選取屬於article子元素的最後一個div元素
/article/div[last()-1] 選取屬於article子元素的倒數第二個div元素
//div[@color] 選取所有擁有color屬性的div元素
//div[@color=’red’] 選取所有color屬性值為red的div元素

xpath語法:

表示式 說明
/div/* 選取屬於div元素的所有子節點
//* 選取所有元素
//div[@*] 選取所有帶屬性的div 元素
//div/a 丨//div/p 選取所有div元素的a和p元素
//span丨//ul 選取文件中的span和ul元素
article/div/p丨//span 選取所有屬於article元素的div元素的p元素以及文件中所有的 span元素

xpath抓取誤區

取某一個網頁上元素的xpath地址

在標題處右鍵使用firebugs檢視元素。 然後在<h1>2016 騰訊軟體開發面試題(部分)</h1>右鍵檢視xpath

# -*- coding: utf-8 -*-
import scrapy

class JobboleSpider(scrapy.Spider):
    name = "jobbole"
    allowed_domains = ["blog.jobbole.com"]
    start_urls = ['http://blog.jobbole.com/110287/']

    def parse(self, response):
        re_selector = response.xpath("/html/body/div[3]/div[3]/div[1]/div[1]/h1")
        # print(re_selector)
        pass

除錯debug可以看到

re_selector =(selectorlist)[]

可以看到返回的是一個空列表, 列表是為了如果我們當前的xpath路徑下還有層級目錄時可以進行選取 空說明沒取到值:

我們可以來chorme裡觀察一下

chorme取到的值//*[@id="post-110287"]/div[1]/h1

chormexpath程式碼

# -*- coding: utf-8 -*-
import scrapy


class JobboleSpider(scrapy.Spider):
    name = "jobbole"
    allowed_domains = ["blog.jobbole.com"]
    start_urls = ['http://blog.jobbole.com/110287/']

    def parse(self, response):
        re_selector = response.xpath('//*[@id="post-110287"]/div[1]/h1')
        # print(re_selector)
        pass

可以看出此時可以取到值

分析頁面,可以發現頁面內有一部html是通過JavaScript ajax互動來生成的,因此在f12檢查元素時的頁面結構裡有,而xpath不對 xpath是基於html原始碼檔案結構來找的

xpath可以有多種多樣的寫法:

re_selector = response.xpath("/html/body/div[1]/div[3]/div[1]/div[1]/h1/text()")
re2_selector = response.xpath('//*[@id="post-110287"]/div[1]/h1/text()')
re3_selector = response.xpath('//div[@class="entry-header]/h1/text()')

推薦使用id型。因為頁面id唯一。

推薦使用class型,因為後期迴圈爬取可擴充套件通用性強。

通過了解了這些此時我們已經可以抓取到頁面的標題,此時可以使用xpath利器照貓畫虎抓取任何內容。只需要點選右鍵檢視xpath。

開啟控制檯除錯

scrapy shell http://blog.jobbole.com/110287/

完整的xpath提取伯樂線上欄位程式碼

# -*- coding: utf-8 -*-
import scrapy
import re

class JobboleSpider(scrapy.Spider):
    name = "jobbole"
    allowed_domains = ["blog.jobbole.com"]
    start_urls = ['http://blog.jobbole.com/110287/']

    def parse(self, response):
        #提取文章的具體欄位
        title = response.xpath('//div[@class="entry-header"]/h1/text()').extract_first("")
        create_date = response.xpath("//p[@class='entry-meta-hide-on-mobile']/text()").extract()[0].strip().replace("·","").strip()
        praise_nums = response.xpath("//span[contains(@class, 'vote-post-up')]/h10/text()").extract()[0]
        fav_nums = response.xpath("//span[contains(@class, 'bookmark-btn')]/text()").extract()[0]
        match_re = re.match(".*?(\d+).*", fav_nums)
        if match_re:
            fav_nums = match_re.group(1)

        comment_nums = response.xpath("//a[@href='#article-comment']/span/text()").extract()[0]
        match_re = re.match(".*?(\d+).*", comment_nums)
        if match_re:
            comment_nums = match_re.group(1)

        content = response.xpath("//div[@class='entry']").extract()[0]

        tag_list = response.xpath("//p[@class='entry-meta-hide-on-mobile']/a/text()").extract()
        tag_list = [element for element in tag_list if not element.strip().endswith("評論")]
        tags = ",".join(tag_list)
        pass

css選擇器的使用:

# 通過css選擇器提取欄位
        # front_image_url = response.meta.get("front_image_url", "")  #文章封面圖
        title = response.css(".entry-header h1::text").extract_first()
        create_date = response.css("p.entry-meta-hide-on-mobile::text").extract()[0].strip().replace("·","").strip()
        praise_nums = response.css(".vote-post-up h10::text").extract()[0]
        fav_nums = response.css(".bookmark-btn::text").extract()[0]
        match_re = re.match(".*?(\d+).*", fav_nums)
        if match_re:
            fav_nums = int(match_re.group(1))
        else:
            fav_nums = 0

        comment_nums = response.css("a[href='#article-comment'] span::text").extract()[0]
        match_re = re.match(".*?(\d+).*", comment_nums)
        if match_re:
            comment_nums = int(match_re.group(1))
        else:
            comment_nums = 0

        content = response.css("div.entry").extract()[0]

        tag_list = response.css("p.entry-meta-hide-on-mobile a::text").extract()
        tag_list = [element for element in tag_list if not element.strip().endswith("評論")]
        tags = ",".join(tag_list)
        pass

3. 爬取所有文章

yield關鍵字

#使用request下載詳情頁面,下載完成後回撥方法parse_detail()提取文章內容中的欄位
yield Request(url=parse.urljoin(response.url,post_url),callback=self.parse_detail)

scrapy.http import Request下載網頁

from scrapy.http import Request
Request(url=parse.urljoin(response.url,post_url),callback=self.parse_detail)

parse拼接網址應對herf內有可能網址不全

from urllib import parse
url=parse.urljoin(response.url,post_url)
parse.urljoin("http://blog.jobbole.com/all-posts/","http://blog.jobbole.com/111535/")
#結果為http://blog.jobbole.com/111535/

class層級關係

next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
#如果.next .pagenumber 是指兩個class為層級關係。而不加空格為同一個標籤

twist非同步機制

Scrapy使用了Twisted作為框架,Twisted有些特殊的地方是它是事件驅動的,並且比較適合非同步的程式碼。在任何情況下,都不要寫阻塞的程式碼。阻塞的程式碼包括:

  • 訪問檔案、資料庫或者Web
  • 產生新的程序並需要處理新程序的輸出,如執行shell命令
  • 執行系統層次操作的程式碼,如等待系統佇列

實現全部文章欄位下載的程式碼:

    def parse(self, response):
        """
                1. 獲取文章列表頁中的文章url並交給scrapy下載後並進行解析
                2. 獲取下一頁的url並交給scrapy進行下載, 下載完成後交給parse
                """
        # 解析列表頁中的所有文章url並交給scrapy下載後並進行解析
        post_urls = response.css("#archive .floated-thumb .post-thumb a::attr(href)").extract()
        for post_url in post_urls:
            #request下載完成之後,回撥parse_detail進行文章詳情頁的解析
            # Request(url=post_url,callback=self.parse_detail)
            print(response.url)
            print(post_url)
            yield Request(url=parse.urljoin(response.url,post_url),callback=self.parse_detail)
            #遇到href沒有域名的解決方案
            #response.url + post_url
            print(post_url)
        # 提取下一頁並交給scrapy進行下載
        next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
        if next_url:
            yield Request(url=parse.urljoin(response.url, post_url), callback=self.parse)

全部文章的邏輯流程圖

所有文章流程圖

4. scrapy的items整合欄位

資料爬取的任務就是從非結構的資料中提取出結構性的資料。 items 可以讓我們自定義自己的欄位(類似於字典,但比字典的功能更齊全)

在當前頁,需要提取多個url

原始寫法,extract之後則生成list列表,無法進行二次篩選:

post_urls = response.css("#archive .floated-thumb .post-thumb a::attr(href)").extract()

改進寫法:

post_nodes = response.css("#archive .floated-thumb .post-thumb a")
        for post_node in post_nodes:
            #獲取封面圖的url
            image_url = post_node.css("img::attr(src)").extract_first("")
            post_url = post_node.css("::attr(href)").extract_first("")

在下載網頁的時候把獲取到的封面圖的url傳給parse_detail的response 在下載網頁時將這個封面url獲取到,並通過meta將他傳送出去。在callback的回撥函式中接收該值

yield Request(url=parse.urljoin(response.url,post_url),meta={"front_image_url":image_url},callback=self.parse_detail)

front_image_url = response.meta.get("front_image_url", "")

urljoin的好處 如果你沒有域名,我就從response裡取出來,如果你有域名則我對你起不了作用了

**編寫我們自定義的item並在jobboled.py中填充。

class JobBoleArticleItem(scrapy.Item):
    title = scrapy.Field()
    create_date = scrapy.Field()
    url = scrapy.Field()
    url_object_id = scrapy.Field()
    front_image_url = scrapy.Field()
    front_image_path = scrapy.Field()
    praise_nums = scrapy.Field()
    comment_nums = scrapy.Field()
    fav_nums = scrapy.Field()
    content = scrapy.Field()
    tags = scrapy.Field()

import之後例項化,例項化之後填充:

1. from ArticleSpider.items import JobBoleArticleItem
2. article_item = JobBoleArticleItem()
3. article_item["title"] = title
        article_item["url"] = response.url
        article_item["create_date"] = create_date
        article_item["front_image_url"] = [front_image_url]
        article_item["praise_nums"] = praise_nums
        article_item["comment_nums"] = comment_nums
        article_item["fav_nums"] = fav_nums
        article_item["tags"] = tags
        article_item["content"] = content

yield article_item將這個item傳送到pipelines中 pipelines可以接收到傳送過來的item 將setting.py中的pipeline配置取消註釋

# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
   'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
}

當我們的item被傳輸到pipeline我們可以將其進行儲存到資料庫等工作

setting設定下載圖片pipeline

ITEM_PIPELINES={
'scrapy.pipelines.images.ImagesPipeline': 1,
}

H:\CodePath\pyEnvs\articlespider3\Lib\site-packages\scrapy\pipelines 裡面有三個scrapy預設提供的pipeline 提供了檔案,圖片,媒體。

ITEM_PIPELINES是一個數據管道的登記表,每一項具體的數字代表它的優先順序,數字越小,越早進入。

setting設定下載圖片的地址

# IMAGES_MIN_HEIGHT = 100
# IMAGES_MIN_WIDTH = 100

設定下載圖片的最小高度,寬度。

新建資料夾images在

IMAGES_URLS_FIELD = "front_image_url"
project_dir = os.path.abspath(os.path.dirname(__file__))
IMAGES_STORE = os.path.join(project_dir, 'images')

安裝PILpip install pillow

定製自己的pipeline使其下載圖片後能儲存下它的本地路徑 get_media_requests()接收一個迭代器物件下載圖片 item_completed獲取到圖片的下載地址

自定義圖片pipeline的除錯資訊

繼承並重寫item_completed()

from scrapy.pipelines.images import ImagesPipeline

class ArticleImagePipeline(ImagesPipeline):
    #重寫該方法可從result中獲取到圖片的實際下載地址
    def item_completed(self, results, item, info):
        for ok, value in results:
            image_file_path = value["path"]
        item["front_image_path"] = image_file_path

        return item

setting中設定使用我們自定義的pipeline,而不是系統自帶的

ITEM_PIPELINES = {
   'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
   # 'scrapy.pipelines.images.ImagesPipeline': 1,
    'ArticleSpider.pipelines.ArticleImagePipeline':1,
}

儲存下來的本地地址

圖片url的md5處理 新建package utils

import hashlib

def get_md5(url):
    m = hashlib.md5()
    m.update(url)
    return m.hexdigest()

if __name__ == "__main__":
    print(get_md5("http://jobbole.com".encode("utf-8")))

不確定使用者傳入的是不是:

def get_md5(url):
    #str就是unicode了
    if isinstance(url, str):
        url = url.encode("utf-8")
    m = hashlib.md5()
    m.update(url)
    return m.hexdigest()

在jobbole.py中將url的md5儲存下來

from ArticleSpider.utils.common import get_md5
article_item["url_object_id"] = get_md5(response.url)

5. 資料儲存到本地檔案以及mysql中

儲存到本地json檔案

import codecs開啟檔案避免一些編碼問題,自定義JsonWithEncodingPipeline實現json本地儲存

class JsonWithEncodingPipeline(object):
    #自定義json檔案的匯出
    def __init__(self):
        self.file = codecs.open('article.json', 'w', encoding="utf-8")
    def process_item(self, item, spider):
        #將item轉換為dict,然後生成json物件,false避免中文出錯
        lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
        self.file.write(lines)
        return item
    #當spider關閉的時候
    def spider_closed(self, spider):
        self.file.close()

setting.py註冊pipeline

ITEM_PIPELINES = {
   'ArticleSpider.pipelines.JsonWithEncodingPipeline': 2,
   # 'scrapy.pipelines.images.ImagesPipeline': 1,
    'ArticleSpider.pipelines.ArticleImagePipeline':1,
}

scrapy exporters JsonItemExporter匯出

scrapy自帶的匯出:

       - 'CsvItemExporter', 
       - 'XmlItemExporter',
       - 'JsonItemExporter'

from scrapy.exporters import JsonItemExporter

class JsonExporterPipleline(object):
    #呼叫scrapy提供的json export匯出json檔案
    def __init__(self):
        self.file = open('articleexport.json', 'wb')
        self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
        self.exporter.start_exporting()

    def  close_spider(self, spider):
        self.exporter.finish_exporting()
        self.file.close()

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

設定setting.py註冊該pipeline

'ArticleSpider.pipelines.JsonExporterPipleline ': 2
  • 1

儲存到資料庫(mysql)

資料庫設計資料表,表的內容欄位是和item一致的。資料庫與item的關係。類似於django中model與form的關係。日期的轉換,將字串轉換為datetime

import datetime
 try:
            create_date = datetime.datetime.strptime(create_date, "%Y/%m/%d").date()
        except Exception as e:
            create_date = datetime.datetime.now().date()

資料庫表設計

jobbole資料表設計

  • 三個num欄位均設定不能為空,然後預設0.
  • content設定為longtext
  • 主鍵設定為url_object_id

資料庫驅動安裝pip install mysqlclient

Linux報錯解決方案: ubuntu:sudo apt-get install libmysqlclient-dev centos:sudo yum install python-devel mysql-devel

儲存到資料庫pipeline(同步)編寫

import MySQLdb
class MysqlPipeline(object):
    #採用同步的機制寫入mysql
    def __init__(self):
        self.conn = MySQLdb.connect('127.0.0.1', 'root', 'password', 'article_spider', charset="utf8", use_unicode=True)
        self.cursor = self.conn.cursor()

    def process_item(self, item, spider):
        insert_sql = """
            insert into jobbole_article(title, url, create_date, fav_nums)
            VALUES (%s, %s, %s, %s)
        """
        self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
        self.conn.commit()

儲存到資料庫的(非同步Twisted)編寫 因為我們的爬取速度可能大於資料庫儲存的速度。非同步操作。 設定可配置引數 seeting.py設定

MYSQL_HOST = "127.0.0.1"
MYSQL_DBNAME = "article_spider"
MYSQL_USER = "root"
MYSQL_PASSWORD = "123456"

程式碼中獲取到設定的可配置引數 twisted非同步:

import MySQLdb.cursors
from twisted.enterprise import adbapi

#連線池ConnectionPool
#    def __init__(self, dbapiName, *connargs, **connkw):
class MysqlTwistedPipline(object):
    def __init__(self, dbpool):
        self.dbpool = dbpool

    @classmethod
    def from_settings(cls, settings):
        dbparms = dict(
            host = settings["MYSQL_HOST"],
            db = settings["MYSQL_DBNAME"],
            user = settings["MYSQL_USER"],
            passwd = settings["MYSQL_PASSWORD"],
            charset='utf8',
            cursorclass=MySQLdb.cursors.DictCursor,
            use_unicode=True,
        )
        #**dbparms-->("MySQLdb",host=settings['MYSQL_HOST']
        dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)

        return cls(dbpool)

    def process_item(self, item, spider):
        #使用twisted將mysql插入變成非同步執行
        query = self.dbpool.runInteraction(self.do_insert, item)
        query.addErrback(self.handle_error, item, spider) #處理異常

    def handle_error(self, failure, item, spider):
        #處理非同步插入的異常
        print (failure)

    def do_insert(self, cursor, item):
        #執行具體的插入
        #根據不同的item 構建不同的sql語句並插入到mysql中
        insert_sql, params = item.get_insert_sql()
        cursor.execute(insert_sql, params)

可選django.items

可以讓我們儲存的item直接變成django的models.

scrapy的itemloader來維護提取程式碼

itemloadr提供了一個容器,讓我們配置某一個欄位該使用哪種規則。 add_css add_value add_xpath

from scrapy.loader import ItemLoader
# 通過item loader載入item
        front_image_url = response.meta.get("front_image_url", "")  # 文章封面圖
        item_loader = ItemLoader(item=JobBoleArticleItem(), response=response)
        item_loader.add_css("title", ".entry-header h1::text")
        item_loader.add_value("url", response.url)
        item_loader.add_value("url_object_id", get_md5(response.url))
        item_loader.add_css("create_date", "p.entry-meta-hide-on-mobile::text")
        item_loader.add_value("front_image_url", [front_image_url])
        item_loader.add_css("praise_nums", ".vote-post-up h10::text")
        item_loader.add_css("comment_nums", "a[href='#article-comment'] span::text")
        item_loader.add_css("fav_nums", ".bookmark-btn::text")
        item_loader.add_css("tags", "p.entry-meta-hide-on-mobile a::text")
        item_loader.add_css("content", "div.entry")
        #呼叫這個方法來對規則進行解析生成item物件
        article_item = item_loader.load_item()

直接使用itemloader的問題

  1. 所有值變成了list
  2. 對於這些值做一些處理函式item.py中對於item process處理函式 MapCompose可以傳入函式對於該欄位進行處理,而且可以傳入多個
from scrapy.loader.processors import MapCompose
def add_mtianyan(value):
    return value+"-mtianyan"

 title = scrapy.Field(
        input_processor=MapCompose(lambda x:x+"mtianyan",add_mtianyan),
    )

注意:此處的自定義方法一定要寫在程式碼前面。

    create_date = scrapy.Field(
        input_processor=MapCompose(date_convert),
        output_processor=TakeFirst()
    )

只取list中的第一個值。

自定義itemloader實現預設提取第一個

class ArticleItemLoader(ItemLoader):
    #自定義itemloader實現預設提取第一個
    default_output_processor = TakeFirst()

list儲存原值

def return_value(value):
    return value

front_image_url = scrapy.Field(
        output_processor=MapCompose(return_value)
    )

下載圖片pipeline增加if增強通用性

class ArticleImagePipeline(ImagesPipeline):
    #重寫該方法可從result中獲取到圖片的實際下載地址
    def item_completed(self, results, item, info):
        if "front_image_url" in item:
            for ok, value in results:
                image_file_path = value["path"]
            item["front_image_path"] = image_file_path

        return item

自定義的item帶處理函式的完整程式碼

class JobBoleArticleItem(scrapy.Item):
    title = scrapy.Field()
    create_date = scrapy.Field(
        input_processor=MapCompose(date_convert),
    )
    url = scrapy.Field()
    url_object_id = scrapy.Field()
    front_image_url = scrapy.Field(
        output_processor=MapCompose(return_value)
    )
    front_image_path = scrapy.Field()
    praise_nums = scrapy.Field(
        input_processor=MapCompose(get_nums)
    )
    comment_nums = scrapy.Field(
        input_processor=MapCompose(get_nums)
    )
    fav_nums = scrapy.Field(
        input_processor=MapCompose(get_nums)
    )
    #因為tag本身是list,所以要重寫
    tags = scrapy.Field(
        input_processor=MapCompose(remove_comment_tags),
        output_processor=Join(",")
    )
    content = scrapy.Field()

三、知乎網問題和答案爬取

1. 基礎知識

session和cookie機制

cookie: 瀏覽器支援的儲存方式 key-value

http無狀態請求,兩次請求沒有聯絡

session的工作原理

(1)當一個session第一次被啟用時,一個唯一的標識被儲存於本地的cookie中。

(2)首先使用session_start()函式,從session倉庫中載入已經儲存的session變數。

(3)通過使用session_register()函式註冊session變數。

(4)指令碼執行結束時,未被銷燬的session變數會被自動儲存在本地一定路徑下的session庫中.

request模擬知乎的登入

http狀態碼

獲取crsftoken

def get_xsrf():
    #獲取xsrf code
    response = requests.get("https://www.zhihu.com",headers =header)
    # # print(response.text)
    # text ='<input type="hidden" name="_xsrf" value="ca70366e5de5d133c3ae09fb16d9b0fa"/>'
    match_obj = re.match('.*name="_xsrf" value="(.*?)"', response.text)
    if match_obj:
        return (match_obj.group(1))
    else:
        return ""

python模擬知乎登入程式碼:

# _*_ coding: utf-8 _*_

import requests
try:
    import cookielib
except:
    import http.cookiejar as cookielib
import re

__author__ = 'mtianyan'
__date__ = '2017/5/23 16:42'


import requests
try:
    import cookielib
except:
    import http.cookiejar as cookielib

import re

session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename="cookies.txt")
try:
    session.cookies.load(ignore_discard=True)
except:
    print ("cookie未能載入")

agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36"
header = {
    "HOST":"www.zhihu.com",
    "Referer": "https://www.zhizhu.com",
    'User-Agent': agent
}

def is_login():
    #通過個人中心頁面返回狀態碼來判斷是否為登入狀態
    inbox_url = "https://www.zhihu.com/question/56250357/answer/148534773"
    response = session.get(inbox_url, headers=header, allow_redirects=False)
    if response.status_code != 200:
        return False
    else:
        return True

def get_xsrf():
    #獲取xsrf code
    response = session.get("https://www.zhihu.com", headers=header)
    response_text = response.text
    #reDOTAll 匹配全文
    match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
    xsrf = ''
    if match_obj:
        xsrf = (match_obj.group(1))
        return xsrf


def get_index():
    response = session.get("https://www.zhihu.com", headers=header)
    with open("index_page.html", "wb") as f:
        f.write(response.text.encode("utf-8"))
    print ("ok")

def get_captcha():
    import time
    t = str(int(time.time()*1000))
    captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
    t = session.get(captcha_url, headers=header)
    with open("captcha.jpg","wb") as f:
        f.write(t.content)
        f.close()

    from PIL import Image
    try:
        im = Image.open('captcha.jpg')
        im.show()
        im.close()
    except:
        pass

    captcha = input("輸入驗證碼\n>")
    return captcha

def zhihu_login(account, password):
    #知乎登入
    if re.match("^1\d{10}",account):
        print ("手機號碼登入")
        post_url = "https://www.zhihu.com/login/phone_num"
        post_data = {
            "_xsrf": get_xsrf(),
            "phone_num": account,
            "password": password,
            "captcha":get_captcha()
        }
    else:
        if "@" in account:
            #判斷使用者名稱是否為郵箱
            print("郵箱方式登入")
            post_url = "https://www.zhihu.com/login/email"
            post_data = {
                "_xsrf": get_xsrf(),
                "email": account,
                "password": password
            }

    response_text = session.post(post_url, data=post_data, headers=header)
    session.cookies.save()

# get_index()
# is_login()
# get_captcha()
zhihu_login("phone", "password")
zhihu_login("shouji", "mima")

2. scrapy建立知乎爬蟲登入

scrapy genspider zhihu www.zhihu.com
  • 1

因為知乎我們需要先進行登入,所以我們重寫它的start_requests

    def start_requests(self):
        return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, callback=self.login)]
  1. 下載首頁然後回撥login函式。

  2. login函式請求驗證碼並回調login_after_captcha函式.此處通過meta將post_data傳送出去,後面的回撥函式來用。

    def login(self, response):
        response_text = response.text
        #獲取xsrf。
        match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
        xsrf = ''
        if match_obj:
            xsrf = (match_obj.group(1))

        if xsrf:
            post_url = "https://www.zhihu.com/login/phone_num"
            post_data = {
                "_xsrf": xsrf,
                "phone_num": "phone",
                "password": "password",
                "captcha": ""
            }

            import time
            t = str(int(time.time() * 1000))
            captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
            #請求驗證碼並回調login_after_captcha.
            yield scrapy.Request(captcha_url, headers=self.headers, 
                meta={"post_data":post_data}, callback=self.login_after_captcha)
  1. login_after_captcha函式將驗證碼圖片儲存到本地,然後使用PIL庫開啟圖片,肉眼識別後在控制檯輸入驗證碼值 然後接受步驟一的meta資料,一併提交至登入介面。回撥check_login檢查是否登入成功。
    def login_after_captcha(self, response):
        with open("captcha.jpg", "wb") as f:
            f.write(response.body)
            f.close()

        from PIL import Image
        try:
            im = Image.open('captcha.jpg')
            im.show()
            im.close()
        except:
            pass

        captcha = input("輸入驗證碼\n>")

        post_data = response.meta.get("post_data", {})
        post_url = "https://www.zhihu.com/login/phone_num"
        post_data["captcha"] = captcha
        return [scrapy.FormRequest(
            url=post_url,
            formdata=post_data,
            headers=self.headers,
            callback=self.check_login
        )]
  1. check_login函式,驗證伺服器的返回資料判斷是否成功 scrapy會對request的URL去重(RFPDupeFilter),加上dont_filter則告訴它這個URL不參與去重.

原始碼中的startrequest:

    def start_requests(self):
        for url in self.start_urls:
            yield self.make_requests_from_url(url)

我們將原本的start_request的程式碼放在了現在重寫的,回撥鏈最後的check_login

 def check_login(self, response):
        #驗證伺服器的返回資料判斷是否成功
        text_json = json.loads(response.text)
        if "msg" in text_json and text_json["msg"] == "登入成功":
            for url in self.start_urls:
                yield scrapy.Request(url, dont_filter=True, headers=self.headers)

登入程式碼流程

3. 知乎資料表設計

知乎答案版本1

上圖為知乎答案版本1

知乎答案版本2

上圖為知乎答案版本2

設定資料表字段

問題欄位 回答欄位
zhihu_id zhihu_id
topics url
url question_id
title author_id
content content
answer_num parise_num
comments_num comments_num
watch_user_num create_time
click_num update_time
crawl_time crawl_time

知乎問題表

知乎答案表

知乎url分析

點具體問題下檢視更多。 可獲得介面:

重點引數:offset=43isend = truenext點選更多介面返回

href=”/question/25460323”

all_urls = [parse.urljoin(response.url, url) for url in all_urls]
  • 1
  1. 從首頁獲取所有a標籤。如果提取的url中格式為 /question/xxx 就下載之後直接進入解析函式parse_question 如果不是question頁面則直接進一步跟蹤。
def parse(self, response):
    """
            提取出html頁面中的所有url 並跟蹤這些url進行一步爬取
            如果提取的url中格式為 /question/xxx 就下載之後直接進入解析函式
            """
    all_urls = response.css("a::attr(href)").extract()
    all_urls = [parse.urljoin(response.url, url) for url in all_urls]
    #使用lambda函式對於每一個url進行過濾,如果是true放回列表,返回false去除。
    all_urls = filter(lambda x:True if x.startswith("https") else False, all_urls)
    for url in all_urls:
        match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
        if match_obj:
            # 如果提取到question相關的頁面則下載後交由提取函式進行提取
            request_url = match_obj.group(1)
            yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
        else:
            # 如果不是question頁面則直接進一步跟蹤
            yield scrapy.Request(url, headers=self.headers, callback=self.parse)
  1. 進入parse_question函式處理 **建立我們的item

item要用到的方法ArticleSpider\utils\common.py:

def extract_num(text):
    #從字串中提取出數字
    match_re = re.match(".*?(\d+).*", text)
    if match_re:
        nums = int(match_re.group(1))
    else:
        nums = 0

    return nums

setting.py中設定SQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" SQL_DATE_FORMAT = "%Y-%m-%d" 使用:

from ArticleSpider.settings import SQL_DATETIME_FORMAT
  • 1

知乎的問題 item

class ZhihuQuestionItem(scrapy.Item):
    #知乎的問題 item
    zhihu_id = scrapy.Field()
    topics = scrapy.Field()
    url = scrapy.Field()
    title = scrapy.Field()
    content = scrapy.Field()
    answer_num = scrapy.Field()
    comments_num = scrapy.Field()
    watch_user_num = scrapy.Field()
    click_num = scrapy.Field()
    crawl_time = scrapy.Field()

    def get_insert_sql(self):
        #插入知乎question表的sql語句
        insert_sql = """
            insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
              watch_user_num, click_num, crawl_time
              )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
              watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
        """
        zhihu_id = self["zhihu_id"][0]
        topics = ",".join(self["topics"])
        url = self["url"][0]
        title = "".join(self["title"])
        content = "".join(self["content"])
        answer_num = extract_num("".join(self["answer_num"]))
        comments_num = extract_num("".join(self["comments_num"]))

        if len(self["watch_user_num"]) == 2:
            watch_user_num = int(self["watch_user_num"][0])
            click_num = int(self["watch_user_num"][1])
        else:
            watch_user_num = int(self["watch_user_num"][0])
            click_num = 0

        crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)

        params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
                  watch_user_num, click_num, crawl_time)

        return insert_sql, params

知乎問題回答item

class ZhihuAnswerItem(scrapy.Item):
    #知乎的問題回答item
    zhihu_id = scrapy.Field()
    url = scrapy.Field()
    question_id = scrapy.Field()
    author_id = scrapy.Field()
    content = scrapy.Field()
    parise_num = scrapy.Field()
    comments_num = scrapy.Field()
    create_time = scrapy.Field()
    update_time = scrapy.Field()
    crawl_time = scrapy.Field()

    def get_insert_sql(self):
        #插入知乎question表的sql語句
        insert_sql = """
            insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num,
              create_time, update_time, crawl_time
              ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
              ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num),
              update_time=VALUES(update_time)
        """

        create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT)
        update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT)
        params = (
            self["zhihu_id"], self["url"], self["question_id"],
            self["author_id"], self["content"], self["parise_num"],
            self["comments_num"], create_time, update_time,
            self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
        )

        return insert_sql, params

有了兩個item之後,我們繼續完善我們的邏輯

    def parse_question(self, response):
        #處理question頁面, 從頁面中提取出具體的question item
        if "QuestionHeader-title" in response.text:
            #處理新版本
            match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
            if match_obj:
                question_id = int(match_obj.group(2))

            item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
            item_loader.add_css("title", "h1.QuestionHeader-title::text")
            item_loader.add_css("content", ".QuestionHeader-detail")
            item_loader.add_value("url", response.url)
            item_loader.add_value("zhihu_id", question_id)
            item_loader.add_css("answer_num", ".List-headerText span::text")
            item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
            item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
            item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text")

            question_item = item_loader.load_item()
        else:
            #處理老版本頁面的item提取
            match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
            if match_obj:
                question_id = int(match_obj.group(2))

            item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
            # item_loader.add_css("title", ".zh-question-title h2 a::text")
            item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()")
            item_loader.add_css("content", "#zh-question-detail")
            item_loader.add_value("url", response.url)
            item_loader.add_value("zhihu_id", question_id)
            item_loader.add_css("answer_num", "#zh-question-answer-num::text")
            item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text")
            # item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text")
            item_loader.add_xpath("watch_user_num", "//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()")
            item_loader.add_css("topics", ".zm-tag-editor-labels a::text")

            question_item = item_loader.load_item()

        yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
        yield question_item

處理問題回答提取出需要的欄位

    def parse_answer(self, reponse):
        #處理question的answer
        ans_json = json.loads(reponse.text)
        is_end = ans_json["paging"]["is_end"]
        next_url = ans_json["paging"]["next"]

        #提取answer的具體欄位
        for answer in ans_json["data"]:
            answer_item = ZhihuAnswerItem()
            answer_item["zhihu_id"] = answer["id"]
            answer_item["url"] = answer["url"]
            answer_item["question_id"] = answer["question"]["id"]
            answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
            answer_item["content"] = answer["content"] if "content" in answer else None
            answer_item["parise_num"] = answer["voteup_count"]
            answer_item["comments_num"] = answer["comment_count"]
            answer_item["create_time"] = answer["created_time"]
            answer_item["update_time"] = answer["updated_time"]
            answer_item["crawl_time"] = datetime.datetime.now()

            yield answer_item

        if not is_end:
            yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)

知乎提取欄位流程圖:

知乎問題及答案提取流程圖

深度優先: 1. 提取出頁面所有的url,並過濾掉不需要的url 2. 如果是questionurl就進入question的解析 3. 把該問題的爬取完了然後就返回初始解析

將item寫入資料庫

pipelines.py錯誤處理 插入時錯誤可通過該方法監控

    def handle_error(self, failure, item, spider):
        #處理非同步插入的異常
        print (failure)

改造pipeline使其變得更通用 原本具體硬編碼的pipeline

  def do_insert(self, cursor, item):
        #執行具體的插入
        insert_sql = """
                    insert into jobbole_article(title, url, create_date, fav_nums)
                    VALUES (%s, %s, %s, %s)
                """
        cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))

改寫後的:

    def do_insert(self, cursor, item):
        #根據不同的item 構建不同的sql語句並插入到mysql中
        insert_sql, params = item.get_insert_sql()
        cursor.execute(insert_sql, params)

可選方法一:

    if item.__class__.__name__ == "JobBoleArticleItem":
        #執行具體的插入
        insert_sql = """
                    insert into jobbole_article(title, url, create_date, fav_nums)
                    VALUES (%s, %s, %s, %s)
                """
        cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))

推薦方法: 把sql語句等放到item裡面: jobboleitem類內部方法

    def get_insert_sql(self):
        insert_sql = """
            insert into jobbole_article(title, url, create_date, fav_nums)
            VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(fav_nums)
        """
        params = (self["title"], self["url"], self["create_date"], self["fav_nums"])

        return insert_sql, params

知乎問題:

    def get_insert_sql(self):
        #插入知乎question表的sql語句
        insert_sql = """
            insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
              watch_user_num, click_num, crawl_time
              )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
              watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
        """
        zhihu_id = self["zhihu_id"][0]
        topics = ",".join(self["topics"])
        url = self["url"][0]
        title = "".join(self["title"])
        content = "".join(self["content"])
        answer_num = extract_num("".join(self["answer_num"]))
        comments_num = extract_num("".join(self["comments_num"]))

        if len(self["watch_user_num"]) == 2:
            watch_user_num = int(self["watch_user_num"][0])
            click_num = int(self["watch_user_num"][1])
        else:
            watch_user_num = int(self["watch_user_num"][0])
            click_num = 0

        crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)

        params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
                  watch_user_num, click_num, crawl_time)

        return insert_sql, params

知乎回答:

    def get_insert_sql(self):
        #插入知乎回答表的sql語句
        insert_sql = """
            insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num,
              create_time, update_time, crawl_time
              ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
              ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num),
              update_time=VALUES(update_time)
        """

        create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT)
        update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT)
        params = (
            self["zhihu_id"], self["url"], self["question_id"],
            self["author_id"], self["content"], self["parise_num"],
            self["comments_num"], create_time, update_time,
            self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
        )

        return insert_sql, params

第二次爬取到相同資料,更新資料

ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
              watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)

除錯技巧

            if match_obj:
                #如果提取到question相關的頁面則下載後交由提取函式進行提取
                request_url = match_obj.group(1)
                yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
                #方便除錯
                break
            else:
                #方便除錯
                pass
                #如果不是question頁面則直接進一步跟蹤
                #方便除錯
                # yield scrapy.Request(url, headers=self.headers, callback=self.parse)
    #方便除錯
        # yield question_item

錯誤排查 [key error] title pipeline中debug定位到哪一個item的錯誤。

四、通過CrawlSpider對招聘網站拉鉤網進行整站爬取

推薦工具cmderhttp://cmder.net/ 下載full版本,使我們在windows環境下也可以使用linux部分命令。 配置path環境變數

1. 設計拉勾網的資料表結構

拉勾網資料庫表設計

2. 初始化拉鉤網專案並解讀crawl原始碼

scrapy genspider --list 檢視可使用的初始化模板 ailable templates: - basic - crawl - csvfeed - xmlfeed

scrapy genspider -t crawl lagou www.lagou.com

cmd與pycharm不同,mark root setting.py 設定目錄

crawl模板

class LagouSpider(CrawlSpider):
    name = 'lagou'
    allowed_domains = ['www.lagou.com']
    start_urls = ['http://www.lagou.com/']

    rules = (
        Rule(LinkExtractor(allow=r'Items/'), callback='parse_item', follow=True),
    )

    def parse_item(self, response):
        i = {}
        #i['domain_id'] = response.xpath('//input[@id="sid"]/@value').extract()
        #i['name'] = response.xpath('//div[@id="name"]').extract()
        #i['description'] = response.xpath('//div[@id="description"]').extract()
        return i

提供了一些可以讓我們進行簡單的follow的規則,link,迭代爬取

rules:

規則,crawel spider讀取並執行

parse_start_url(response):

example:

rules是一個可迭代物件,裡面有Rule例項->LinkExtractor的分析allow=('category\.php', ), callback='parse_item', allow允許的url模式。callback,要回調的函式名。 因為rules裡面沒有self,無法獲取到方法。

import scrapy
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor

class MySpider(CrawlSpider):
    name = 'example.com'
    allowed_domains = ['example.com']
    start_urls = ['http://www.example.com']

    rules = (
        # Extract links matching 'category.php' (but not matching 'subsection.php')
        # and follow links from them (since no callback means follow=True by default).
        Rule(LinkExtractor(allow=('category\.php', ), deny=('subsection\.php', ))),

        # Extract links matching 'item.