1. 程式人生 > >筆記-scrapy-深入學習-sheduler

筆記-scrapy-深入學習-sheduler

筆記-scrapy-深入學習-sheduler

 

1.      scheduler.py

source code:scrapy/core/scheduler.py:

 

1.1.    初始化的開始

在分析engine的open_spider函式時,講過scheduler物件是通過類的from_cralwer方法生成的,程式碼如下:

 

@classmethod

def from_crawler(cls, crawler):

    settings = crawler.settings

    dupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])

    dupefilter = dupefilter_cls.from_settings(settings)

    pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])

    dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])

    mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])

    logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS')

    return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,

               stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)

在最後實際還是呼叫類的__init__()。

class Scheduler(object):

 

    def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,

                 logunser=False, stats=None, pqclass=None):

        self.df = dupefilter

        self.dqdir = self._dqdir(jobdir)

        self.pqclass = pqclass

        self.dqclass = dqclass

        self.mqclass = mqclass

        self.logunser = logunser

        self.stats = stats

在這裡指定了各種佇列型別,但並沒有構造佇列,在下面會講到。

實際上排程器預設只定義了2種佇列型別,但並不是在這裡構造的:

dqs:基於磁碟的任務佇列:在配置檔案可配置儲存路徑,每次執行後會把佇列任務儲存到磁碟上;

mqs:基於記憶體的任務佇列:每次都在記憶體中執行,下次啟動則消失;

 

1.2.    核心物件

前面共建立了4個物件,分別是dupefilter,pqclass,dqclass,mqclass.

1.dupefilter:

DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter'

這個類的含義是"Request Fingerprint duplicates filter",請求指紋重複過濾。

它對每個request請求生成指紋,對指紋重複的請求進行過濾。具體實現見文件:scrapy-去重

 

2.pqclass

SCHEDULER_PRIORITY_QUEUE = 'queuelib.PriorityQueue'

從名字上也可以看出這個一個優先順序佇列,使用的是queuelib.它的作用應該不說也明白就是對request請求按優先順序進行排序。

如何指定優先順序?

前面講spider時,講述過可以在spider中定義Rule規則來過濾我們需要跟進的連結形式,我們只要定義規則時指定一個process_request關鍵字引數即可,這個引數是一個函式,會傳遞給我們將要繼續跟進的Request,我們直接對其設定priority屬性即可。

 

優先順序是一個整數,雖然queuelib使用小的數做為高優化級,但是由於scheduler再入佇列時取了負值,所以實際上數值越大優先順序越高,程式碼如下:

    def _dqpush(self, request):

        if self.dqs is None:

            return

        try:

            reqd = request_to_dict(request, self.spider)

            self.dqs.push(reqd, -request.priority)

 

3.dqclass

SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'

從名字上看,這是一個支援序列化的後進先出的磁碟佇列。主要用來幫助我們在停止爬蟲後可以接著上一次繼續開始爬蟲。

序列化要指定一個目錄,用於儲存序列化檔案。這個目錄在命令列上通過'-s JOBDIR=XXX'來指定。scheduler會在這個目錄下建立active.json檔案,用來序列化佇列的優先順序。

 

def _dq(self):

    activef = join(self.dqdir, 'active.json')

    if exists(activef):

        with open(activef) as f:

            prios = json.load(f)

    else:

        prios = ()

    q = self.pqclass(self._newdq, startprios=prios)

    if q:

        logger.info("Resuming crawl (%(queuesize)d requests scheduled)",

                    {'queuesize': len(q)}, extra={'spider': self.spider})

    return q

 

_dq在engine open_spider時呼叫scheduler的open時呼叫,可以看到如果命令指定了JOBDIR引數,則從目錄下尋找active.json,這個檔案儲存的上一次指定的優先順序集合,然後用它和_newdq一起構造磁碟佇列,這樣就可以接著上次停止時的狀態繼續爬取了。

其中_newdq會使用JOBDIR和優先順序作為引數初始化磁碟佇列物件。

 

def _newdq(self, priority):

    return self.dqclass(join(self.dqdir, 'p%s' % priority))

 

最後在scheduler關閉時會將優化級存入檔案active.json檔案,用於下次反序列化。

def close(self, reason):

    if self.dqs:

        prios = self.dqs.close()

        with open(join(self.dqdir, 'active.json'), 'w') as f:

            json.dump(prios, f)

    return self.df.close(reason)

 

4.mqclass

SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'

從名字上看,是後進先出的記憶體佇列。這個佇列是為了構造優先順序佇列而存在的,在構造優先順序佇列時,需要傳遞一個佇列工廠類,用它來構造每個不同的優先順序佇列,構造時會向這個佇列工廠類傳遞優先順序作為唯一的引數。

預設情況下實際上就是queuelib.LifoMemoryQueue.

 

1.3.    入列及出列

前面瞭解了記憶體佇列和磁碟佇列,下面看下scheduler怎樣出列入列:

請求的獲取和存入流程:

def next_request(self):

    request = self.mqs.pop()

    if request:

        self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)

    else:

        request = self._dqpop()

        if request:

            self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)

    if request:

        self.stats.inc_value('scheduler/dequeued', spider=self.spider)

    return request

取請求時優先使用記憶體佇列,如果記憶體佇列沒有請求再使用磁碟佇列。

在請求入佇列時,優先存入磁碟佇列,如果沒有磁碟佇列再存入記憶體佇列。

def enqueue_request(self, request):

    if not request.dont_filter and self.df.request_seen(request):

        self.df.log(request, self.spider)

        return False

    dqok = self._dqpush(request)

    if dqok:

        self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)

    else:

        self._mqpush(request)

        self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)

    self.stats.inc_value('scheduler/enqueued', spider=self.spider)

    return True

 

Scheduler在入列前呼叫過濾器進行過濾,如果request物件沒有定義dont_filter選項,則用df來過濾,關於去重方法,見文件:scrapy-去重。.

排程器的核心基本就這些了。

另外還有一個LOG_UNSERIALIZABLE_REQUESTS引數,它是用來指定如果一個請求序列化失敗,是否要記錄日誌。

 

1.4.    佇列實現

 

前面定義了佇列,但並沒有例項化,

    def open(self, spider):

        self.spider = spider

        self.mqs = self.pqclass(self._newmq)

        self.dqs = self._dq() if self.dqdir else None

        return self.df.open()

 

使用self.pqclass(self._newmq),預設情況下等效於PriorityQueue(scrapy.squeues.LifoMemoryQueue)

 

class PriorityQueue(object):

    """A priority queue implemented using multiple internal queues (typically,

    FIFO queues). The internal queue must implement the following methods:

    """

 

    def __init__(self, qfactory, startprios=()):

        self.queues = {}

        self.qfactory = qfactory

        for p in startprios:

            self.queues[p] = self.qfactory(p)

        self.curprio = min(startprios) if startprios else None

 

    def push(self, obj, priority=0):

        if priority not in self.queues:

            self.queues[priority] = self.qfactory(priority)

        q = self.queues[priority]

        q.push(obj) # this may fail (eg. serialization error)

        if self.curprio is None or priority < self.curprio:

            self.curprio = priority

 

    def pop(self):

        if self.curprio is None:

            return

        q = self.queues[self.curprio]

        m = q.pop()

        if len(q) == 0:

            del self.queues[self.curprio]

            q.close()

            prios = [p for p, q in self.queues.items() if len(q) > 0]

            self.curprio = min(prios) if prios else None

        return m

 

這裡是優先順序佇列的初始化程式碼,queues是一個dict,dict元素是priority:qfactory形式。

結合起來的結果就是scrapy待爬佇列初始化都是優先順序佇列,如果以後所有request的優先順序相同(預設0),那麼待爬佇列就時一個FIFO/LIFO。如果使用不同的優先順序,那麼就是一個優先順序隊列了。

qfactory指定二級佇列型別,它來自SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue',

修改它有兩種結果:

  1. 所有請求優先順序為0或相同,那麼待爬佇列就是一個LIFO或FIFO;
  2. 請求的優先順序不同,那麼待爬佇列就是一個優先順序佇列,優等級佇列的元素值也是佇列(LIFO或FIFO)。

 

1.5.    爬取方式

從上一章節可以看出scrapy預設使用LIFO,那麼它的爬取方式是深度優先,如果想使用廣度優先,把SCHEDULER_MEMORY_QUEUE 改為FIFO就可以了。