在Scrapy中使用Django的ORM非同步儲存資料
阿新 • • 發佈:2020-12-04
在Scrapy中使用Django的ORM非同步儲存資料
django的orm可以脫離django使用,只要我們將django的環境舒適化就可以了。
在scrapy中使用
首先我們的建立一個django專案,然後在建立一個scrapy專案。
然後再scrapy中初始化django的環境
一般我們在scrapy的專案的__init__.py
裡面初始化
import django import os import sys # 將django的專案路徑加入到當前的環境 sys.path.insert(0, os.path.dirname(os.getcwd())) # django專案舒適化 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProject.settings') django.setup()
初始化完成之後,我們就可以直接在scrapy中匯入django的orm並使用了。
非同步儲存
因為scrapy是非同步的爬蟲框架,如果我們在裡面直接使用django的orm會有io阻塞的操作。這時候我們就藉助asyncio
這個包來幫助我們在scrapy
中運行同步阻塞的程式碼。
因為我們scrapy
中處理資料都放在pipline
中處理,所以我放在pipline
中展示
from concurrent.futures import ThreadPoolExecutor import asyncio from goods import models from . import items class WebspidersPipeline: '''todo 非同步儲存''' # 建立事件迴圈物件 loop = asyncio.get_event_loop() # 建立執行緒池 executor = ThreadPoolExecutor() # 任務佇列 tasks = [] # 處理不同的pipline async def process_item(self, item, spider): if isinstance(item, items.GoodsItem): return self.process_goods_item(item, spider) elif isinstance(item, items.GoodsSizeItem): return self.process_goods_size_item(item, spider) elif isinstance(item, items.GoodsStockItem): return self.process_goods_stock_item(item, spider) return item def process_goods_item(self, item, spider): '''將儲存資料的處理方法加入到任務佇列''' task = self.loop.run_in_executor(self.executor, self.executor_func(models.Goods, item), ) self.tasks.append(task) return item def process_goods_size_item(self, item, spider): task = self.loop.run_in_executor(self.executor, self.executor_func(models.GoodsSize, item), ) self.tasks.append(task) return item def process_goods_stock_item(self, item, spider): task = self.loop.run_in_executor(self.executor, self.executor_func(models.GoodsStock, item), ) self.tasks.append(task) return item @staticmethod def executor_func(model, item): '''主要作用是將有引數的函式轉換為無引數的函式返回,方便run_in_executor方法呼叫,這個方法它只接受位置傳參,不接受關鍵字傳參''' def func(): return model.objects.create(**item) return func def close_spider(self, spider): '''當爬蟲關閉的時候呼叫這個方法儲存資料''' self.loop.run_until_complete(asyncio.wait(self.tasks))
執行結果
之前直接使用同步的方法儲存的時候,2000個請求+資料儲存花費了大約10分鐘(sqlite3)
後面使用非同步儲存的時候,使用sqlite3會報錯,因為sqlite3是單執行緒的,我們是一個執行緒池物件,併發儲存會被sqlite3拒絕(database was locked)
後面改用了mysql儲存,2000個請求+資料儲存花費了大約40s,這個提升量還是很驚人的。
後面分析了一下,在scrapy中使用同步的方式儲存會導致scrapy的非同步請求會等待同步的儲存完成之後才去執行,大量的時間浪費了等待上面。
後面單獨執行網路請求部分,沒有資料儲存,2000個請求花費了大約25s旁邊。