如何用item pipeline(管道)清洗資料
管道是什麼 Item管道(Item Pipeline): 主要負責處理有蜘蛛從網頁中抽取的Item,主要任務是清洗、驗證和儲存資料。 當頁面被蜘蛛解析後,將被髮送到Item管道,並經過幾個特定的次序處理資料。 每個Item管道的元件都是有一個簡單的方法組成的Python類。 它們獲取了Item並執行它們的方法,同時還需要確定是否需要在Item管道中繼續執行下一步或是直接丟棄掉不處理。 類(Class): 用來描述具有相同的屬性和方法的物件的集合。它定義了該集合中每個物件所共有的屬性和方法。物件是類的例項。 管道的作用 清理HTML資料 驗證抓取的資料(檢查專案是否包含特定欄位) 檢查重複(並刪除) 考慮到效能的原因,去重最好在連結中去重,或者利用資料庫主鍵的唯一性去重 將刮取的專案儲存在資料庫中 接著上文《如何使用scrapy的item來封裝資料》,我現在需要實現額外的三個處理——將價格的單位英鎊轉換為人民幣、去除掉書名相同的重複資料、將資料存入MongoDB中。
如何實現Item Pipeline 一個Item Pipeline不需要繼承特定基類,只需要實現某些特定方法,如process_item、open_spider、close_spider等。
process_item(item , spider):
每個 Item Pipeline 元件都需要呼叫該方法,這個方法必須返回一個 Item (或任何繼承類)物件, 或是丟擲 DropItem 異常,被丟棄的 item 將不會被之後的 pipeline 元件所處理
需要傳入的引數為:
item (Item 物件) : 被爬取的 item spider (Spider 物件) : 爬取該 item 的 spider
該方法會被每一個 item pipeline 元件所呼叫,process_item 必須返回以下其中的任意一個物件:
一個 dict
一個 Item 物件或者它的子類物件 一個 Twisted Deferred 物件 一個 DropItem exception;如果返回此異常,則該 item 將不會被後續的 item pipeline 所繼續訪問
注意:該方法是Item Pipeline必須實現的方法,其它三個方法(open_spider/close_spider/from_crawler)是可選的方法 如果process_item返回了一項資料(item或字典),返回的資料會傳遞給下一級Item Pipeline繼續處理,如果沒有則結束處理。 另外,當process_item在處理某項item時丟擲DropItem異常,該項item便會被拋棄,不再傳遞給後面的Item Pipeline處理,也不會匯出到檔案。
open_spider(self , spider ):——爬蟲啟動時呼叫
Spider開啟時,即處理資料前,會回撥該方法。該方法通常用於在開始處理資料前完成一些初始化工作,比如連線資料庫。
close_spider(self , spider):——爬蟲關閉時呼叫
與open_spider相對,為Spider關閉時,即處理資料後,會回撥該方法。該方法通常用於在處理完所有資料之後完成某些清理工作,比如關閉資料庫。
from_crawler(cls, crawler):——也是在爬蟲啟動時呼叫,但是比open_spider早
建立Item Pipeline物件時回撥該類方法。該類方法用來從 Crawler 中初始化得到一個 pipeline 例項;它必須返回一個新的 pipeline 例項;Crawler 物件提供了訪問所有 Scrapy 核心元件的介面,包括 settings 和 signals
程式碼實現
新建bookpipelines.py檔案
from scrapy.exceptions import DropItem from scrapy.item import Item import pymongo
#實現價格轉換——第一個Item Pipeline,執行順序較次 class PricePipeline(object):
exchange_rate = 8.5309
def process_item(self , item , spider): price = float(item['price'][1:]) * self.exchange_rate item['price'] = '¥ %.2f' % price return item
# 實現去除重複書名的資料——第二個Item Pipeline,執行順序最優 class DuplicatesPipeline(object):
def __init__(self): self.book_set = set()
def process_item(self , item , spider): name = item['name'] if name in self.book_set: raise DropItem("Duplicate book found: %s" % item) self.book_set.add(name) return item
#實現將資料存入mongoDB中——第三個Item Pipeline,執行順序最後 class MongoDBPipeline(object):
def from_crawler(cls , crawler): cls.DB_URI = crawler.settings.get('MONGO_DB_URI' , 'mongodb://localhost:27017/') cls.DB_NAME = crawler.settings.get('MONGO_DB_NAME' , 'scrapy_datas') return cls()
def open_spider(self , spider): self.client = pymongo.MongoClient(self.DB_URI) self.db = self.client[self.DB_NAME]
def close_spider(self , spider): self.client.close()
def process_item(self , item , spider): collection = self.db[spider.name] post = dict(item) if isinstance(item , Item) else item collection.insert_one(post) return item