1. 程式人生 > >使用gevent的Pool實現非同步併發

使用gevent的Pool實現非同步併發

目標

  • 掌握gevent中Pool基本使用
  • 實現程式碼的重構,使用gevent來進一步提高效率

1 為什麼使用gevent

對於I/O密集型任務,gevent能對效能做很大提升的,協程的建立、排程開銷都比執行緒小的多。

2 通過配置檔案設定屬性,來判斷所使用的非同步方式

# 非同步方式  thread、coroutine
ASYNC_TYPE = 'coroutine'

3 讓gevent的Pool和執行緒池Pool的介面一致

因此需要單獨對gevent的Pool進行一下修改,具體如下: 在scrapy_plus下建立async包,隨後建立coroutine.py模組

# scrapy_plus/async/coroutine.py
'''
由於gevent的Pool的沒有close方法,也沒有異常回調引數
引出需要對gevent的Pool進行一些處理,實現與執行緒池一樣介面,實現執行緒和協程的無縫轉換
'''
from gevent.pool import Pool as BasePool
import gevent.monkey
gevent.monkey.patch_all()    # 打補丁,替換內建的模組


class Pool(BasePool):
    '''協程池
    使得具有close方法
    使得apply_async方法具有和執行緒池一樣的介面
    '''
    def apply_async(self, func, args=None, kwds=None, callback=None, error_callback=None):
        return super().apply_async(func, args=args, kwds=kwds, callback=callback)

    def close(self):
        '''什麼都不需要執行'''
        pass

4 在引擎中使用上:

# scrapy_plus/core/engine.py
import time
import importlib
from datetime import datetime

from scrapy_plus.http.request import Request    # 匯入Request物件
from scrapy_plus.utils.log import logger    # 匯入logger
from scrapy_plus.conf import settings

# 判斷使用什麼非同步模式,改用對應的非同步池
if settings.ASYNC_TYPE == 'thread':
    from multiprocessing.dummy import Pool    # 匯入執行緒池物件
elif settings.ASYNC_TYPE == 'coroutine':
    from scrapy_plus.async.coroutine import Pool
else:
    raise Exception("不支援的非同步型別:%s, 只能是'thread'或者'coroutine'"%settings.ASYNC_TYPE)

# 注意:
# 由於打patch補丁是為了替換掉socket為非阻塞的
# 而下載器中正好使用了requests模組,如果在這之後匯入協程池,會導致requests中使用的socket沒有被替換成功
# 從而有可能導致使用出現問題
from .scheduler import Scheduler
from .downloader import Downloader

class Engine(object):
    ......