1. 程式人生 > 實用技巧 >在Scrapy中使用Django的ORM非同步儲存資料

在Scrapy中使用Django的ORM非同步儲存資料

在Scrapy中使用Django的ORM非同步儲存資料

django的orm可以脫離django使用,只要我們將django的環境舒適化就可以了。

在scrapy中使用

首先我們的建立一個django專案,然後在建立一個scrapy專案。

然後再scrapy中初始化django的環境

一般我們在scrapy的專案的__init__.py裡面初始化

import django
import os
import sys

# 將django的專案路徑加入到當前的環境
sys.path.insert(0, os.path.dirname(os.getcwd()))

# django專案舒適化
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProject.settings')

django.setup()

初始化完成之後,我們就可以直接在scrapy中匯入django的orm並使用了。

非同步儲存

因為scrapy是非同步的爬蟲框架,如果我們在裡面直接使用django的orm會有io阻塞的操作。這時候我們就藉助asyncio這個包來幫助我們在scrapy中運行同步阻塞的程式碼。

因為我們scrapy中處理資料都放在pipline中處理,所以我放在pipline中展示

from concurrent.futures import ThreadPoolExecutor
import asyncio
from goods import models
from . import items


class WebspidersPipeline:
    '''todo 非同步儲存'''
    
    # 建立事件迴圈物件
    loop = asyncio.get_event_loop()
    # 建立執行緒池
    executor = ThreadPoolExecutor()
    # 任務佇列
    tasks = []
	
    # 處理不同的pipline
    async def process_item(self, item, spider):
        if isinstance(item, items.GoodsItem):
            return self.process_goods_item(item, spider)
        elif isinstance(item, items.GoodsSizeItem):
            return self.process_goods_size_item(item, spider)
        elif isinstance(item, items.GoodsStockItem):
            return self.process_goods_stock_item(item, spider)
        return item
	
    def process_goods_item(self, item, spider):
        '''將儲存資料的處理方法加入到任務佇列'''
        task = self.loop.run_in_executor(self.executor, self.executor_func(models.Goods, item), )
        self.tasks.append(task)
        return item

    def process_goods_size_item(self, item, spider):
        task = self.loop.run_in_executor(self.executor, self.executor_func(models.GoodsSize, item), )
        self.tasks.append(task)
        return item

    def process_goods_stock_item(self, item, spider):
        task = self.loop.run_in_executor(self.executor, self.executor_func(models.GoodsStock, item), )
        self.tasks.append(task)
        return item

    @staticmethod
    def executor_func(model, item):
        '''主要作用是將有引數的函式轉換為無引數的函式返回,方便run_in_executor方法呼叫,這個方法它只接受位置傳參,不接受關鍵字傳參'''
        def func():
            return model.objects.create(**item)

        return func

    def close_spider(self, spider):
        '''當爬蟲關閉的時候呼叫這個方法儲存資料'''
        self.loop.run_until_complete(asyncio.wait(self.tasks))

執行結果

之前直接使用同步的方法儲存的時候,2000個請求+資料儲存花費了大約10分鐘(sqlite3)

後面使用非同步儲存的時候,使用sqlite3會報錯,因為sqlite3是單執行緒的,我們是一個執行緒池物件,併發儲存會被sqlite3拒絕(database was locked)

後面改用了mysql儲存,2000個請求+資料儲存花費了大約40s,這個提升量還是很驚人的。

後面分析了一下,在scrapy中使用同步的方式儲存會導致scrapy的非同步請求會等待同步的儲存完成之後才去執行,大量的時間浪費了等待上面。

後面單獨執行網路請求部分,沒有資料儲存,2000個請求花費了大約25s旁邊。