Scrapy 入門筆記(4) --- 使用 Pipeline 儲存資料
最近學習用 Scrapy 框架寫爬蟲,簡單來說爬蟲就是從網上抓取網頁,解析網頁,然後進行資料的儲存與分析,將從網頁的解析到資料的轉換儲存。將學習過程中用到的解析技術,Scrapy 的各個模組使用與進階到分散式爬蟲學到的知識點、遇到的問題以及解決方法記錄於此,以作總結與備忘,也希望對需要的同學有所幫助。
本篇主要講解 pipeline 儲存資料模組的使用,包括將資料儲存為 Json 檔案,儲存到 MySQL 資料庫以及儲存圖片
Scrapy 提供了 pipeline 模組來執行儲存資料的操作。在建立的 Scrapy 專案中自動建立了一個 pipeline.py 檔案,同時建立了一個預設的 Pipeline 類。我們可以根據需要自定義 Pipeline 類,然後在 settings.py 檔案中進行配置即可,如下
# 指定用來處理資料的 Pipeline 類,後面的數字代表執行順序,取值範圍是 0-1000 range.
# 數值小的 Pipeline 類優先執行
ITEM_PIPELINES = {
'StackoverFlowSpider.pipelines.StackoverflowspiderPipeline': 2,
}
接下來我們自定義 Pipeline 類來對將 Item 轉為 Json 檔案進行儲存。
Pipeline 類會在 process_item 方法中處理資料,然後在結束時呼叫 close_spider 方法,因此我們
需要自定義這兩個方法做相應的處理。
兩點提示
- 在 process_item() 方法處理完成後要返回 item 供後面的 Pipeline 類繼續操作
- 記得在 close_spider() 中釋放資源
1. 自定義 Pipeline 儲存 Json 資料
import json
import codecs
class StackJsonPipeline:
# 初始化時指定要操作的檔案
def __init__(self):
self.file = codecs.open('questions.json', 'w', encoding='utf-8')
# 儲存資料,將 Item 例項作為 json 資料寫入到檔案中
def process_item(self, item, spider):
lines = json.dumps(dict(item), ensure_ascii=False) + '\n'
self.file.write(lines)
return item
# 處理結束後關閉 檔案 IO 流
def close_spider(self, spider):
self.file.close()
2. 使用 Scrapy 提供的 exporter 儲存 Json 資料
Scrapy 為我們提供了一個 JsonItemExporter 類來進行 Json 資料的儲存,非常方便,下面是使用該類
進行儲存的自定義 Pipeline 類示例。
from scrapy.exporters import JsonItemExporter
class JsonExporterPipeline:
# 呼叫 scrapy 提供的 json exporter 匯出 json 檔案
def __init__(self):
self.file = open('questions_exporter.json', 'wb')
# 初始化 exporter 例項,執行輸出的檔案和編碼
self.exporter = JsonItemExporter(self.file,encoding='utf-8',ensure_ascii=False)
# 開啟倒數
self.exporter.start_exporting()
def close_spider(self, spider):
self.exporter.finish_exporting()
self.file.close()
# 將 Item 例項匯出到 json 檔案
def process_item(self, item, spider):
self.exporter.export_item(item)
return item
上面就是兩個使用自定義 Pipeline 類生成 Json 資料的示例,有一點需要強調的是,我們使用 exporter 生成的
其實是一個數組, 下面是我使用上面兩個類生成的兩個檔案截圖,第一個生成的是許多個 Json 資料,後者是一個由
Json 資料組成的陣列:
- 使用 json 模組生成的檔案
- 使用 scrapy.exporters.JsonItemExporter 生成的檔案
3. 將資料儲存到 MySQL 資料庫
下面是一個將我們的資料儲存到 MySQL 資料庫的 Pipeline 類
# 這裡我們使用 mysql-connector-python 驅動,可以使用 pip 進行安裝
import mysql.connector
class MysqlPipeline:
def __init__(self):
self.conn = mysql.connector.connect(user='root', password='root', database='stack_db', )
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
title = item.get('question_title')
votes = item.get('question_votes')
answers = item.get('question_answers')
views = item.get('question_views')
tags = item.get('tags')
insert_sql = """
insert into stack_questions(title, votes, answers, views,tags)
VALUES (%s, %s, %s, %s,%s);
"""
self.cursor.execute(insert_sql, (title, votes, answers, views, tags))
self.conn.commit()
return item
def close_spider(self, spider):
self.cursor.close()
self.conn.close()
將 MysqlPipeline 配置在 settings 檔案中後就可以將爬取到的資料儲存到 MySQL 資料庫了,這裡方便起見直接將 SQL 語句寫在了
process_item() 方法中,實際開發中最好將 SQL 語句封裝進方法,然後再封裝其專門的 Item 類中,這樣我們的處理方法就可以根據傳遞
過來的不同 Item 呼叫不同的 SQL 語句,可以極大的提高程式的擴充套件性和我們爬蟲程式碼的可重用性。
4. 實現 MySQL 儲存的非同步操作
上面的 Pipeline 類雖然可以將資料寫在 MySQL 資料中,但是在 Scrapy 對資料的處理是同步執行的,當爬取資料量很大的時候,會出現插入資料的速度跟不上網頁的爬取解析速度,造成阻塞,為了解決這個問題需要將 MySQL 的資料儲存非同步化。Python 中提供了 Twisted 框架來實現非同步操作,該框架提供了一個連線池,通過連線池可以實現資料插入 MySQL 的非同步化。
下面是集合 Twisted 框架實現的 Pipeline 類,可以完成 MySQL 的非同步化操作:
這裡使用的是 pymysql 模組,在初始化 Pipeline 的時候,通過引數建立資料庫連線池 dbpool,
然後在 process_item 方法中來對連線池進行配置,執行其執行方法和資料。這裡我們沒有寫出上面例子中出現的 SQL 語句,而是將其封裝到了具體的 Item類中,這樣我們的 Pipeline 類可以處理各種不同的資料。
import pymysql
from twisted.enterprise import adbapi
class MysqlTwistedPipline(object):
def __init__(self, ):
dbparms = dict(
host='localhost',
db='stack_db',
user='root',
passwd='root',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor, # 指定 curosr 型別
use_unicode=True,
)
# 指定擦做資料庫的模組名和資料庫引數引數
self.dbpool = adbapi.ConnectionPool("pymysql", **dbparms)
# 使用twisted將mysql插入變成非同步執行
def process_item(self, item, spider):
# 指定操作方法和操作的資料
query = self.dbpool.runInteraction(self.do_insert, item)
# 指定異常處理方法
query.addErrback(self.handle_error, item, spider) #處理異常
def handle_error(self, failure, item, spider):
#處理非同步插入的異常
print (failure)
def do_insert(self, cursor, item):
#執行具體的插入
#根據不同的item 構建不同的sql語句並插入到mysql中
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
QuestionItem 中的 get_insert_sql() 方法程式碼如下:
def get_insert_sql(self):
insert_sql = """
insert into stack_questions(title, votes, answers, views,tags)
VALUES (%s, %s, %s, %s,%s);
"""
params = (self["question_title"], self["question_votes"], self["question_answers"], self["question_views"],self["tags"])
return insert_sql,params
5. 使用 Scrapy 自帶的 ImagesPipeline 儲存圖片
上面基本都是我們自定義的 Pipeline 類來操作資料的,現在簡要介紹一下 Scrapy 提供的一個 Pipeline 類 — ImagesPipeline。
通過執行該類然後進行配置,可以在爬取的同時自動將圖片資料儲存到本地,下面簡要介紹其用法:
- 配置 settings 檔案
在 ITEM_PIPELINES 中配置
ITEM_PIPELINES = {
'scrapy.pipelines.images.ImagesPipeline': 1,
}
- 配置儲存的欄位和本地路徑
因為我們的資料都是封裝在 Item 類裡面的,因此配置完 ImagesPipeline 類後要做的就是讓該類知道應該儲存哪個欄位的資料以及儲存到何處。
需要在 settings 中配置如下變數:
# 要儲存的欄位,即在 Item 類中的欄位名為 image_url
IMAGES_URLS_FIELD = 'image_url'
import os
# 配置資料儲存路徑,為當前工程目錄下的 images 目錄中
project_dir = os.path.abspath(os.path.dirname(__file__))
IMAGES_STORE = os.path.join(project_dir, 'images')
# 設定圖片的最大最小值
# IMAGES_MIN_HEIGHT = 100
# IMAGES_MIN_WIDTH = 100
- 在傳遞引數時傳遞陣列
ImagesPipeline 要求傳遞資料必須是以資料形式的,否則會報錯
item_loader.add_value("image_url", [image_url])
經過上面三步再次爬取時,如果爬取的內容有圖片資料,就可以按照上面的步驟將圖片進行下載了。
以上介紹了比較常用的 Pipeline 類的用法,Scrapy 還提供了更多的自帶 Pipeline 類,有興趣的同學可以參閱文件繼續深入學習。
現在關於 Scrapy 的所有操作已經基本完成了,從爬蟲的建立、爬取解析、Item 封裝到 Pipeline 儲存都已經講解完畢。