1. 程式人生 > >如何用item pipeline(管道)清洗資料

如何用item pipeline(管道)清洗資料

管道是什麼 Item管道(Item Pipeline): 主要負責處理有蜘蛛從網頁中抽取的Item,主要任務是清洗、驗證和儲存資料。 當頁面被蜘蛛解析後,將被髮送到Item管道,並經過幾個特定的次序處理資料。 每個Item管道的元件都是有一個簡單的方法組成的Python類。 它們獲取了Item並執行它們的方法,同時還需要確定是否需要在Item管道中繼續執行下一步或是直接丟棄掉不處理。 類(Class): 用來描述具有相同的屬性和方法的物件的集合。它定義了該集合中每個物件所共有的屬性和方法。物件是類的例項。 管道的作用 清理HTML資料 驗證抓取的資料(檢查專案是否包含特定欄位) 檢查重複(並刪除)  考慮到效能的原因,去重最好在連結中去重,或者利用資料庫主鍵的唯一性去重 將刮取的專案儲存在資料庫中 接著上文《如何使用scrapy的item來封裝資料》,我現在需要實現額外的三個處理——將價格的單位英鎊轉換為人民幣、去除掉書名相同的重複資料、將資料存入MongoDB中。

如何實現Item Pipeline 一個Item Pipeline不需要繼承特定基類,只需要實現某些特定方法,如process_item、open_spider、close_spider等。

process_item(item , spider):

每個 Item Pipeline 元件都需要呼叫該方法,這個方法必須返回一個 Item (或任何繼承類)物件, 或是丟擲 DropItem 異常,被丟棄的 item 將不會被之後的 pipeline 元件所處理 

需要傳入的引數為: 

item (Item 物件) : 被爬取的 item spider (Spider 物件) : 爬取該 item 的 spider 

該方法會被每一個 item pipeline 元件所呼叫,process_item 必須返回以下其中的任意一個物件:

一個 dict

一個 Item 物件或者它的子類物件 一個 Twisted Deferred 物件 一個 DropItem exception;如果返回此異常,則該 item 將不會被後續的 item pipeline 所繼續訪問 

注意:該方法是Item Pipeline必須實現的方法,其它三個方法(open_spider/close_spider/from_crawler)是可選的方法 如果process_item返回了一項資料(item或字典),返回的資料會傳遞給下一級Item Pipeline繼續處理,如果沒有則結束處理。  另外,當process_item在處理某項item時丟擲DropItem異常,該項item便會被拋棄,不再傳遞給後面的Item Pipeline處理,也不會匯出到檔案。

open_spider(self , spider ):——爬蟲啟動時呼叫

Spider開啟時,即處理資料前,會回撥該方法。該方法通常用於在開始處理資料前完成一些初始化工作,比如連線資料庫。

close_spider(self , spider):——爬蟲關閉時呼叫

與open_spider相對,為Spider關閉時,即處理資料後,會回撥該方法。該方法通常用於在處理完所有資料之後完成某些清理工作,比如關閉資料庫。

from_crawler(cls, crawler):——也是在爬蟲啟動時呼叫,但是比open_spider早

建立Item Pipeline物件時回撥該類方法。該類方法用來從 Crawler 中初始化得到一個 pipeline 例項;它必須返回一個新的 pipeline 例項;Crawler 物件提供了訪問所有 Scrapy 核心元件的介面,包括 settings 和 signals

程式碼實現

新建bookpipelines.py檔案

from scrapy.exceptions import DropItem from scrapy.item import Item import pymongo

#實現價格轉換——第一個Item Pipeline,執行順序較次 class PricePipeline(object):

    exchange_rate = 8.5309

    def process_item(self , item , spider):             price = float(item['price'][1:]) * self.exchange_rate             item['price'] = '¥ %.2f' % price             return item

# 實現去除重複書名的資料——第二個Item Pipeline,執行順序最優 class DuplicatesPipeline(object):

    def __init__(self):         self.book_set = set()

    def process_item(self , item , spider):         name = item['name']         if name in self.book_set:             raise DropItem("Duplicate book found: %s" % item)         self.book_set.add(name)         return item

#實現將資料存入mongoDB中——第三個Item Pipeline,執行順序最後 class MongoDBPipeline(object):

    def from_crawler(cls , crawler):         cls.DB_URI = crawler.settings.get('MONGO_DB_URI' , 'mongodb://localhost:27017/')         cls.DB_NAME = crawler.settings.get('MONGO_DB_NAME' , 'scrapy_datas')         return cls()

    def open_spider(self , spider):         self.client = pymongo.MongoClient(self.DB_URI)         self.db = self.client[self.DB_NAME]

    def close_spider(self , spider):         self.client.close()

    def process_item(self , item , spider):         collection = self.db[spider.name]         post = dict(item) if isinstance(item , Item) else item         collection.insert_one(post)         return item