Scrapy框架中的Pipeline組件
阿新 • • 發佈:2018-05-21
object OS @class ror inter setting ima utf8 encoding
簡介
在下圖中可以看到items.py與pipeline.py,其中items是用來定義抓取內容的實體;pipeline則是用來處理抓取的item的管道
Item管道的主要責任是負責處理有蜘蛛從網頁中抽取的Item,他的主要任務是清晰、驗證和存儲數據。當頁面被蜘蛛解析後,將被發送到Item管道,並經過幾個特定的次序處理數據。每個Item管道的組件都是有一個簡單的方法組成的Python類。獲取了Item並執行方法,同時還需要確定是否需要在Item管道中繼續執行下一步或是直接丟棄掉不處理。簡而言之,就是通過spider爬取的數據都會通過這個pipeline處理,可以在pipeline中不進行操作或者執行相關對數據的操作。
管道的功能
1.清理HTML數據
2.驗證解析到的數據(檢查Item是否包含必要的字段)
3.檢查是否是重復數據(如果重復就刪除)
4.將解析到的數據存儲到數據庫中
Pipeline中的操作
process_item(item, spider)
每一個item管道組件都會調用該方法,並且必須返回一個item對象實例或raise DropItem異常。被丟掉的item將不會在管道組件進行執行。此方法有兩個參數,一個是item,即要處理的Item對象,另一個參數是spider,即爬蟲。
此外,我們也可以在類中實現以下方法
open_spider(spider)
當spider執行的時候將調用該方法
close_spider(spider)
當spider關閉的時候將調用該方法
定制自己的Pipeline組件:
1.生成json數據
class JsonWithEncodingPipeline(object):
def __init__(self):
self.file=codecs.open('article.json', 'w', encoding="utf-8")
def process_item(self, item, spider):
lines=json.dumps(dict(item), ensure_ascii= False) + '\n'
self.file.write(lines)
return item
def spider_closed(self, spider):
self.file.close()
2.操作mysql關系數據庫
class MysqlPipeline(object):
def __init__(self):
self.conn=MySQLdb.connect('localhost', 'root', '*****', 'article_spider', charset="utf8", use_unicode=True)
self.cursor=self.conn.cursor()
def process_item(self, item, spider):
insert_sql="""
insert into article_items(title, url, url_object_id , create_date)
VALUES(%s, %s, %s, %s)
"""
self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))
self.conn.commit()
3.異步操作mysql關系數據庫
# 異步處理關系數據庫
class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool=dbpool
@classmethod
def from_settings(cls, settings):
dbparms=dict(
host=settings["MYSQL_HOST"], #這裏要在settings中事先定義好
db=settings["MYSQL_DBNAME"],
user=settings["MYSQL_USER"],
passwd=settings["MYSQL_PASSWORD"],
charset="utf8",
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool=adbapi.ConnectPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider):
# 使用twisted將mysql插入變成異步執行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error)
def handle_error(self, failure, item, spider):
#處理異步插入的異常
print(failure)
def do_insert(self, cursor, item):
#執行具體的插入
insert_sql = """
insert into article_items(title, url, url_object_id , create_date)
VALUES(%s, %s, %s, %s)
"""
self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))
4.數據去重
from scrapy.exceptions import DropItem
class DuplicatesPipeline(object):
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
if item['id'] in self.ids_seen:
raise DropItem("Duplicate item found: %s" % item)
else:
self.ids_seen.add(item['id'])
return item
使用組件
# Configure item pipelines
# See https://doc.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
# 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
# 'scrapy.pipelines.images.ImagesPipeline': 1,
'ArticleSpider.pipelines.MysqlPipeline': 1,
# 'ArticleSpider.pipelines.JsonExporterPipeline': 2,
# 'ArticleSpider.pipelines.ArticleImagePipeline': 1
}
每個pipeline後面有一個數值,這個數組的範圍是0-1000,這個數值是這些在pipeline中定義的類的優先級,越小越優先。
Scrapy框架中的Pipeline組件