1. 程式人生 > 資料庫 >python使用adbapi實現MySQL資料庫的非同步儲存

python使用adbapi實現MySQL資料庫的非同步儲存

之前一直在寫有關scrapy爬蟲的事情,今天我們看看使用scrapy如何把爬到的資料放在MySQL資料庫中儲存。

有關python操作MySQL資料庫的內容,網上已經有很多內容可以參考了,但都是在同步的操作MySQL資料庫。在資料量不大的情況下,這種方法固然可以,但是一旦資料量增長後,MySQL就會出現崩潰的情況,因為網上爬蟲的速度要遠遠高過往資料庫中插入資料的速度。為了避免這種情況發生,我們就需要使用非同步的方法來儲存資料,爬蟲與資料儲存互不影響。

為了顯示方便,我們把程式設計的簡單一點,只是爬一頁的資料。我們今天選擇伯樂線上這個網站來爬取,只爬取第一頁的資料。

首先我們還是要啟動一個爬蟲專案,然後自己建了一個爬蟲的檔案jobbole.py。我們先來看看這個檔案中的程式碼

# -*- coding: utf-8 -*-
import io
import sys
import scrapy
import re
import datetime
from scrapy.http import Request
from urllib import parse
from ArticleSpider.items import JobboleArticleItem,ArticleItemLoader
from scrapy.loader import ItemLoader
sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf-8')
 
class JobboleSpider(scrapy.Spider):
 """docstring for JobboleSpider"""
 name = "jobbole"
 allowed_domain = ["blog.jobbole.com"]
 start_urls = ['http://blog.jobbole.com/all-posts/']
 
 def parse(self,response):
 """
 1.獲取列表頁中的文章url
 """
 # 解析列表匯中所有文章url並交給scrapy下載器並進行解析
 post_nodes = response.css("#archive .floated-thumb .post-thumb a")
 for post_node in post_nodes:
 image_url = post_node.css("img::attr(src)").extract_first("")# 這裡取出每篇文章的封面圖,並作為meta傳入Request
 post_url = post_node.css("::attr(href)").extract_first("")
 yield Request(url = parse.urljoin(response.url,post_url),meta = {"front_image_url":image_url},callback = self.parse_detail)
 
 def parse_detail(self,response):
 article_item = JobboleArticleItem()
 # 通過ItemLoader載入Item
 # 通過add_css後的返回值都是list型,所有我們再items.py要進行處理
 item_loader = ArticleItemLoader(item = JobboleArticleItem(),response = response)
 item_loader.add_css("title",".entry-header h1::text")
 item_loader.add_value("url",response.url)
 # item_loader.add_value("url_object_id",get_md5(response.url))
 item_loader.add_value("url_object_id",response.url)
 item_loader.add_css("create_date","p.entry-meta-hide-on-mobile::text")
 item_loader.add_value("front_image_url",[front_image_url])
 item_loader.add_css("praise_nums",".vote-post-up h10::text")
 item_loader.add_css("comment_nums","a[href='#article-comment'] span::text")
 item_loader.add_css("fav_nums",".bookmark-btn::text")
 item_loader.add_css("tags","p.entry-meta-hide-on-mobile a::text")
 item_loader.add_css("content","div.entry")
 
 article_item = item_loader.load_item()
 print(article_item["tags"])
 
 yield article_item
 pass

這裡我把程式碼進行了簡化,首先對列表頁發出請求,這裡只爬取一頁資料,然後分析每一頁的url,並且交給scrapy對每一個url進行請求,得到每篇文章的詳情頁,把詳情頁的相關內容放在MySQL資料庫中。
這裡使用itemloader來進行頁面的解析,這樣解析有個最大的好處就是可以把解析規則存放在資料庫中,實現對解析規則的動態載入。但是要注意一點是使用itemloader中css方式和xpath方式得到的資料都是list型,因此還需要在items.py中再對相對應的資料進行處理。

接下來我們就來看看items.py是如何處理list資料的。

# -*- coding: utf-8 -*-
 
# Define here the models for your scraped items
#
# See documentation in:
# https://doc.scrapy.org/en/latest/topics/items.html
import datetime
import re
 
 
import scrapy
from scrapy.loader import ItemLoader
from scrapy.loader.processors import MapCompose,TakeFirst,Join
from ArticleSpider.utils.common import get_md5
 
 
def convert_date(value):
 try:
 create_date = datetime.datetime.strptime(create_date,"%Y/%m/%d").date()
 except Exception as e:
 create_date = datetime.datetime.now().date()
 return create_date
 
def get_nums(value):
 match_re = re.match(".*?(\d+).*",value)
 if match_re:
 nums = int(match_re.group(1))
 else:
 nums = 0
 
 return nums
 
def remove_comment_tags(value):
 # 去掉tags中的評論內容
 if "評論" in value:
 # 這裡做了修改,如果返回"",則在list中仍然會佔位,會變成類似於["程式設計師","解鎖"]這樣
 # return ""
 return None 
 else:
 return value
 
def return_value(value):
 return 
 
class ArticleItemLoader(ItemLoader):
 """docstring for AriticleItemLoader"""
 # 自定義ItemLoader
 default_output_processor = TakeFirst()
 
class ArticlespiderItem(scrapy.Item):
 # define the fields for your item here like:
 # name = scrapy.Field()
 pass
 
class JobboleArticleItem(scrapy.Item):
 """docstring for ArticlespiderItem"""
 title = scrapy.Field()
 create_date = scrapy.Field(
 input_processor = MapCompose(convert_date)
 )
 url = scrapy.Field()
 url_object_id = scrapy.Field(
 output_processor = MapCompose(get_md5)
 )
 # 這裡注意front_image_url還是一個list,在進行sql語句時還需要處理
 front_image_url = scrapy.Field(
 output_processor = MapCompose(return_value)
 )
 front_image_path = scrapy.Field()
 praise_nums = scrapy.Field(
 input_processor = MapCompose(get_nums)
 )
 comment_nums = scrapy.Field(
 input_processor = MapCompose(get_nums)
 )
 fav_nums = scrapy.Field(
 input_processor = MapCompose(get_nums)
 )
 # tags要做另行處理,因為tags我們需要的就是list
 tags = scrapy.Field(
 input_processor = MapCompose(remove_comment_tags),output_processor = Join(",")
 )
 content = scrapy.Field()

首先我們看到定義了一個類ArticleItemloader,在這個類中只有一句話,就是對於每個items都預設採用list中的第一個元素,這樣我們就可以把每個items中的第一個元素取出來。但是要注意,有些items我們是必須要用list型的,比如我們給ImagePipeline的資料就要求必須是list型,這樣我們就需要對front_image_url單獨進行處理。這裡我們做了一個小技巧,對front_image_url什麼都不錯,因為我們傳過來的front_image_url就是list型
在items的Field中有兩個引數,一個是input_processor,另一個是output_processor,這兩個引數可以幫助我們對items的list中的每個元素進行處理,比如有些需要用md5進行加密,有些需要用正則表示式進行篩選或者排序等等。

在進行mysql的pipeline之前,我們需要設計資料庫,下面是我自己設計的資料庫的欄位,僅供參考

這裡我把url_object_id作為該表的主鍵,由於它不會重複,所以適合做主鍵。

下面我們來看看資料庫的pipeline。

# -*- coding: utf-8 -*-
 
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
import codecs
import json
from twisted.enterprise import adbapi
import MySQLdb
import MySQLdb.cursors
 
 
class MysqlTwistedPipeline(object):
 """docstring for MysqlTwistedPipeline"""
 #採用非同步的機制寫入mysql
 def __init__(self,dbpool):
 self.dbpool = dbpool
 
 @classmethod
 def from_settings(cls,settings):
 dbparms = dict(
 host = settings["MYSQL_HOST"],db = settings["MYSQL_DBNAME"],user = settings["MYSQL_USER"],passwd = settings["MYSQL_PASSWORD"],charset='utf8',cursorclass=MySQLdb.cursors.DictCursor,use_unicode=True,)
 dbpool = adbapi.ConnectionPool("MySQLdb",**dbparms)
 
 return cls(dbpool)
 
 def process_item(self,item,spider):
 #使用twisted將mysql插入變成非同步執行
 query = self.dbpool.runInteraction(self.do_insert,item)
 query.addErrback(self.handle_error,spider) #處理異常
 return item
 
 def handle_error(self,failure,spider):
 # 處理非同步插入的異常
 print (failure)
 
 def do_insert(self,cursor,item):
 #執行具體的插入
 #根據不同的item 構建不同的sql語句並插入到mysql中
 # insert_sql,params = item.get_insert_sql()
 # print (insert_sql,params)
 # cursor.execute(insert_sql,params)
 insert_sql = """
 insert into jobbole_article(title,url,create_date,fav_nums,url_object_id)
 VALUES (%s,%s,%s)
 """
 # 可以只使用execute,而不需要再使用commit函式
 cursor.execute(insert_sql,(item["title"],item["url"],item["create_date"],item["fav_nums"],item["url_object_id"]))

在這裡我們只是演示一下,我們只向資料庫中插入5個欄位的資料,分別是title,url,create_date,fav_nums,url_object_id。

當然你也可以再加入其它的欄位。

首先我們看看from_settings這個函式,它可以從settings.py檔案中取出我們想想要的資料,這裡我們把資料庫的host,dbname,username和password都放在settings.py中。實際的插入語句還是在process_item中進行,我們自己定義了一個函式do_insert,然後把它傳給dbpool中用於插入真正的資料。

最後我們來看看settings.py中的程式碼,這裡就很簡單了。

MYSQL_HOST = "localhost"
MYSQL_DBNAME = "article_wilson"
MYSQL_USER = "root"
MYSQL_PASSWORD = "root"

其實這裡是和pipeline中的程式碼是想對應的,別忘了把在settings.py中把pipeline開啟。

ITEM_PIPELINES = {
 # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,# 'ArticleSpider.pipelines.JsonWithEncodingPipeline': 1
 
 # # 'scrapy.pipelines.images.ImagePipeline': 1,# 'ArticleSpider.pipelines.JsonExporterPipleline': 1
 # 'ArticleSpider.pipelines.ArticleImagePipeline': 2
 # 'ArticleSpider.pipelines.MysqlPipeline': 1
 'ArticleSpider.pipelines.MysqlTwistedPipeline': 1
}

好了,現在我們可以跑一程式吧。

scrapy crawl jobbole

下面是執行結果的截圖

好了,以上就是今天的全部內容了。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。