python使用adbapi實現MySQL資料庫的非同步儲存
之前一直在寫有關scrapy爬蟲的事情,今天我們看看使用scrapy如何把爬到的資料放在MySQL資料庫中儲存。
有關python操作MySQL資料庫的內容,網上已經有很多內容可以參考了,但都是在同步的操作MySQL資料庫。在資料量不大的情況下,這種方法固然可以,但是一旦資料量增長後,MySQL就會出現崩潰的情況,因為網上爬蟲的速度要遠遠高過往資料庫中插入資料的速度。為了避免這種情況發生,我們就需要使用非同步的方法來儲存資料,爬蟲與資料儲存互不影響。
為了顯示方便,我們把程式設計的簡單一點,只是爬一頁的資料。我們今天選擇伯樂線上這個網站來爬取,只爬取第一頁的資料。
首先我們還是要啟動一個爬蟲專案,然後自己建了一個爬蟲的檔案jobbole.py。我們先來看看這個檔案中的程式碼
# -*- coding: utf-8 -*- import io import sys import scrapy import re import datetime from scrapy.http import Request from urllib import parse from ArticleSpider.items import JobboleArticleItem,ArticleItemLoader from scrapy.loader import ItemLoader sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='utf-8') class JobboleSpider(scrapy.Spider): """docstring for JobboleSpider""" name = "jobbole" allowed_domain = ["blog.jobbole.com"] start_urls = ['http://blog.jobbole.com/all-posts/'] def parse(self,response): """ 1.獲取列表頁中的文章url """ # 解析列表匯中所有文章url並交給scrapy下載器並進行解析 post_nodes = response.css("#archive .floated-thumb .post-thumb a") for post_node in post_nodes: image_url = post_node.css("img::attr(src)").extract_first("")# 這裡取出每篇文章的封面圖,並作為meta傳入Request post_url = post_node.css("::attr(href)").extract_first("") yield Request(url = parse.urljoin(response.url,post_url),meta = {"front_image_url":image_url},callback = self.parse_detail) def parse_detail(self,response): article_item = JobboleArticleItem() # 通過ItemLoader載入Item # 通過add_css後的返回值都是list型,所有我們再items.py要進行處理 item_loader = ArticleItemLoader(item = JobboleArticleItem(),response = response) item_loader.add_css("title",".entry-header h1::text") item_loader.add_value("url",response.url) # item_loader.add_value("url_object_id",get_md5(response.url)) item_loader.add_value("url_object_id",response.url) item_loader.add_css("create_date","p.entry-meta-hide-on-mobile::text") item_loader.add_value("front_image_url",[front_image_url]) item_loader.add_css("praise_nums",".vote-post-up h10::text") item_loader.add_css("comment_nums","a[href='#article-comment'] span::text") item_loader.add_css("fav_nums",".bookmark-btn::text") item_loader.add_css("tags","p.entry-meta-hide-on-mobile a::text") item_loader.add_css("content","div.entry") article_item = item_loader.load_item() print(article_item["tags"]) yield article_item pass
這裡我把程式碼進行了簡化,首先對列表頁發出請求,這裡只爬取一頁資料,然後分析每一頁的url,並且交給scrapy對每一個url進行請求,得到每篇文章的詳情頁,把詳情頁的相關內容放在MySQL資料庫中。
這裡使用itemloader來進行頁面的解析,這樣解析有個最大的好處就是可以把解析規則存放在資料庫中,實現對解析規則的動態載入。但是要注意一點是使用itemloader中css方式和xpath方式得到的資料都是list型,因此還需要在items.py中再對相對應的資料進行處理。
接下來我們就來看看items.py是如何處理list資料的。
# -*- coding: utf-8 -*- # Define here the models for your scraped items # # See documentation in: # https://doc.scrapy.org/en/latest/topics/items.html import datetime import re import scrapy from scrapy.loader import ItemLoader from scrapy.loader.processors import MapCompose,TakeFirst,Join from ArticleSpider.utils.common import get_md5 def convert_date(value): try: create_date = datetime.datetime.strptime(create_date,"%Y/%m/%d").date() except Exception as e: create_date = datetime.datetime.now().date() return create_date def get_nums(value): match_re = re.match(".*?(\d+).*",value) if match_re: nums = int(match_re.group(1)) else: nums = 0 return nums def remove_comment_tags(value): # 去掉tags中的評論內容 if "評論" in value: # 這裡做了修改,如果返回"",則在list中仍然會佔位,會變成類似於["程式設計師","解鎖"]這樣 # return "" return None else: return value def return_value(value): return class ArticleItemLoader(ItemLoader): """docstring for AriticleItemLoader""" # 自定義ItemLoader default_output_processor = TakeFirst() class ArticlespiderItem(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() pass class JobboleArticleItem(scrapy.Item): """docstring for ArticlespiderItem""" title = scrapy.Field() create_date = scrapy.Field( input_processor = MapCompose(convert_date) ) url = scrapy.Field() url_object_id = scrapy.Field( output_processor = MapCompose(get_md5) ) # 這裡注意front_image_url還是一個list,在進行sql語句時還需要處理 front_image_url = scrapy.Field( output_processor = MapCompose(return_value) ) front_image_path = scrapy.Field() praise_nums = scrapy.Field( input_processor = MapCompose(get_nums) ) comment_nums = scrapy.Field( input_processor = MapCompose(get_nums) ) fav_nums = scrapy.Field( input_processor = MapCompose(get_nums) ) # tags要做另行處理,因為tags我們需要的就是list tags = scrapy.Field( input_processor = MapCompose(remove_comment_tags),output_processor = Join(",") ) content = scrapy.Field()
首先我們看到定義了一個類ArticleItemloader,在這個類中只有一句話,就是對於每個items都預設採用list中的第一個元素,這樣我們就可以把每個items中的第一個元素取出來。但是要注意,有些items我們是必須要用list型的,比如我們給ImagePipeline的資料就要求必須是list型,這樣我們就需要對front_image_url單獨進行處理。這裡我們做了一個小技巧,對front_image_url什麼都不錯,因為我們傳過來的front_image_url就是list型
在items的Field中有兩個引數,一個是input_processor,另一個是output_processor,這兩個引數可以幫助我們對items的list中的每個元素進行處理,比如有些需要用md5進行加密,有些需要用正則表示式進行篩選或者排序等等。
在進行mysql的pipeline之前,我們需要設計資料庫,下面是我自己設計的資料庫的欄位,僅供參考
這裡我把url_object_id作為該表的主鍵,由於它不會重複,所以適合做主鍵。
下面我們來看看資料庫的pipeline。
# -*- coding: utf-8 -*- # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html import codecs import json from twisted.enterprise import adbapi import MySQLdb import MySQLdb.cursors class MysqlTwistedPipeline(object): """docstring for MysqlTwistedPipeline""" #採用非同步的機制寫入mysql def __init__(self,dbpool): self.dbpool = dbpool @classmethod def from_settings(cls,settings): dbparms = dict( host = settings["MYSQL_HOST"],db = settings["MYSQL_DBNAME"],user = settings["MYSQL_USER"],passwd = settings["MYSQL_PASSWORD"],charset='utf8',cursorclass=MySQLdb.cursors.DictCursor,use_unicode=True,) dbpool = adbapi.ConnectionPool("MySQLdb",**dbparms) return cls(dbpool) def process_item(self,item,spider): #使用twisted將mysql插入變成非同步執行 query = self.dbpool.runInteraction(self.do_insert,item) query.addErrback(self.handle_error,spider) #處理異常 return item def handle_error(self,failure,spider): # 處理非同步插入的異常 print (failure) def do_insert(self,cursor,item): #執行具體的插入 #根據不同的item 構建不同的sql語句並插入到mysql中 # insert_sql,params = item.get_insert_sql() # print (insert_sql,params) # cursor.execute(insert_sql,params) insert_sql = """ insert into jobbole_article(title,url,create_date,fav_nums,url_object_id) VALUES (%s,%s,%s) """ # 可以只使用execute,而不需要再使用commit函式 cursor.execute(insert_sql,(item["title"],item["url"],item["create_date"],item["fav_nums"],item["url_object_id"]))
在這裡我們只是演示一下,我們只向資料庫中插入5個欄位的資料,分別是title,url,create_date,fav_nums,url_object_id。
當然你也可以再加入其它的欄位。
首先我們看看from_settings這個函式,它可以從settings.py檔案中取出我們想想要的資料,這裡我們把資料庫的host,dbname,username和password都放在settings.py中。實際的插入語句還是在process_item中進行,我們自己定義了一個函式do_insert,然後把它傳給dbpool中用於插入真正的資料。
最後我們來看看settings.py中的程式碼,這裡就很簡單了。
MYSQL_HOST = "localhost" MYSQL_DBNAME = "article_wilson" MYSQL_USER = "root" MYSQL_PASSWORD = "root"
其實這裡是和pipeline中的程式碼是想對應的,別忘了把在settings.py中把pipeline開啟。
ITEM_PIPELINES = { # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300,# 'ArticleSpider.pipelines.JsonWithEncodingPipeline': 1 # # 'scrapy.pipelines.images.ImagePipeline': 1,# 'ArticleSpider.pipelines.JsonExporterPipleline': 1 # 'ArticleSpider.pipelines.ArticleImagePipeline': 2 # 'ArticleSpider.pipelines.MysqlPipeline': 1 'ArticleSpider.pipelines.MysqlTwistedPipeline': 1 }
好了,現在我們可以跑一程式吧。
scrapy crawl jobbole
下面是執行結果的截圖
好了,以上就是今天的全部內容了。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。