筆記-scrapy-深入學習-sheduler
筆記-scrapy-深入學習-sheduler
1. scheduler.py
source code:scrapy/core/scheduler.py:
1.1. 初始化的開始
在分析engine的open_spider函式時,講過scheduler物件是通過類的from_cralwer方法生成的,程式碼如下:
@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
dupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])
dupefilter = dupefilter_cls.from_settings(settings)
pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])
dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])
mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])
logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS')
return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)
在最後實際還是呼叫類的__init__()。
class Scheduler(object):
def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,
logunser=False, stats=None, pqclass=None):
self.df = dupefilter
self.dqdir = self._dqdir(jobdir)
self.pqclass = pqclass
self.dqclass = dqclass
self.mqclass = mqclass
self.logunser = logunser
self.stats = stats
在這裡指定了各種佇列型別,但並沒有構造佇列,在下面會講到。
實際上排程器預設只定義了2種佇列型別,但並不是在這裡構造的:
dqs:基於磁碟的任務佇列:在配置檔案可配置儲存路徑,每次執行後會把佇列任務儲存到磁碟上;
mqs:基於記憶體的任務佇列:每次都在記憶體中執行,下次啟動則消失;
1.2. 核心物件
前面共建立了4個物件,分別是dupefilter,pqclass,dqclass,mqclass.
1.dupefilter:
DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter'
這個類的含義是"Request Fingerprint duplicates filter",請求指紋重複過濾。
它對每個request請求生成指紋,對指紋重複的請求進行過濾。具體實現見文件:scrapy-去重
2.pqclass
SCHEDULER_PRIORITY_QUEUE = 'queuelib.PriorityQueue'
從名字上也可以看出這個一個優先順序佇列,使用的是queuelib.它的作用應該不說也明白就是對request請求按優先順序進行排序。
如何指定優先順序?
前面講spider時,講述過可以在spider中定義Rule規則來過濾我們需要跟進的連結形式,我們只要定義規則時指定一個process_request關鍵字引數即可,這個引數是一個函式,會傳遞給我們將要繼續跟進的Request,我們直接對其設定priority屬性即可。
優先順序是一個整數,雖然queuelib使用小的數做為高優化級,但是由於scheduler再入佇列時取了負值,所以實際上數值越大優先順序越高,程式碼如下:
def _dqpush(self, request):
if self.dqs is None:
return
try:
reqd = request_to_dict(request, self.spider)
self.dqs.push(reqd, -request.priority)
3.dqclass
SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
從名字上看,這是一個支援序列化的後進先出的磁碟佇列。主要用來幫助我們在停止爬蟲後可以接著上一次繼續開始爬蟲。
序列化要指定一個目錄,用於儲存序列化檔案。這個目錄在命令列上通過'-s JOBDIR=XXX'來指定。scheduler會在這個目錄下建立active.json檔案,用來序列化佇列的優先順序。
def _dq(self):
activef = join(self.dqdir, 'active.json')
if exists(activef):
with open(activef) as f:
prios = json.load(f)
else:
prios = ()
q = self.pqclass(self._newdq, startprios=prios)
if q:
logger.info("Resuming crawl (%(queuesize)d requests scheduled)",
{'queuesize': len(q)}, extra={'spider': self.spider})
return q
_dq在engine open_spider時呼叫scheduler的open時呼叫,可以看到如果命令指定了JOBDIR引數,則從目錄下尋找active.json,這個檔案儲存的上一次指定的優先順序集合,然後用它和_newdq一起構造磁碟佇列,這樣就可以接著上次停止時的狀態繼續爬取了。
其中_newdq會使用JOBDIR和優先順序作為引數初始化磁碟佇列物件。
def _newdq(self, priority):
return self.dqclass(join(self.dqdir, 'p%s' % priority))
最後在scheduler關閉時會將優化級存入檔案active.json檔案,用於下次反序列化。
def close(self, reason):
if self.dqs:
prios = self.dqs.close()
with open(join(self.dqdir, 'active.json'), 'w') as f:
json.dump(prios, f)
return self.df.close(reason)
4.mqclass
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
從名字上看,是後進先出的記憶體佇列。這個佇列是為了構造優先順序佇列而存在的,在構造優先順序佇列時,需要傳遞一個佇列工廠類,用它來構造每個不同的優先順序佇列,構造時會向這個佇列工廠類傳遞優先順序作為唯一的引數。
預設情況下實際上就是queuelib.LifoMemoryQueue.
1.3. 入列及出列
前面瞭解了記憶體佇列和磁碟佇列,下面看下scheduler怎樣出列入列:
請求的獲取和存入流程:
def next_request(self):
request = self.mqs.pop()
if request:
self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)
else:
request = self._dqpop()
if request:
self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)
if request:
self.stats.inc_value('scheduler/dequeued', spider=self.spider)
return request
取請求時優先使用記憶體佇列,如果記憶體佇列沒有請求再使用磁碟佇列。
在請求入佇列時,優先存入磁碟佇列,如果沒有磁碟佇列再存入記憶體佇列。
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
dqok = self._dqpush(request)
if dqok:
self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
else:
self._mqpush(request)
self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
self.stats.inc_value('scheduler/enqueued', spider=self.spider)
return True
Scheduler在入列前呼叫過濾器進行過濾,如果request物件沒有定義dont_filter選項,則用df來過濾,關於去重方法,見文件:scrapy-去重。.
排程器的核心基本就這些了。
另外還有一個LOG_UNSERIALIZABLE_REQUESTS引數,它是用來指定如果一個請求序列化失敗,是否要記錄日誌。
1.4. 佇列實現
前面定義了佇列,但並沒有例項化,
def open(self, spider):
self.spider = spider
self.mqs = self.pqclass(self._newmq)
self.dqs = self._dq() if self.dqdir else None
return self.df.open()
使用self.pqclass(self._newmq),預設情況下等效於PriorityQueue(scrapy.squeues.LifoMemoryQueue)
class PriorityQueue(object):
"""A priority queue implemented using multiple internal queues (typically,
FIFO queues). The internal queue must implement the following methods:
"""
def __init__(self, qfactory, startprios=()):
self.queues = {}
self.qfactory = qfactory
for p in startprios:
self.queues[p] = self.qfactory(p)
self.curprio = min(startprios) if startprios else None
def push(self, obj, priority=0):
if priority not in self.queues:
self.queues[priority] = self.qfactory(priority)
q = self.queues[priority]
q.push(obj) # this may fail (eg. serialization error)
if self.curprio is None or priority < self.curprio:
self.curprio = priority
def pop(self):
if self.curprio is None:
return
q = self.queues[self.curprio]
m = q.pop()
if len(q) == 0:
del self.queues[self.curprio]
q.close()
prios = [p for p, q in self.queues.items() if len(q) > 0]
self.curprio = min(prios) if prios else None
return m
這裡是優先順序佇列的初始化程式碼,queues是一個dict,dict元素是priority:qfactory形式。
結合起來的結果就是scrapy待爬佇列初始化都是優先順序佇列,如果以後所有request的優先順序相同(預設0),那麼待爬佇列就時一個FIFO/LIFO。如果使用不同的優先順序,那麼就是一個優先順序隊列了。
qfactory指定二級佇列型別,它來自SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue',
修改它有兩種結果:
- 所有請求優先順序為0或相同,那麼待爬佇列就是一個LIFO或FIFO;
- 請求的優先順序不同,那麼待爬佇列就是一個優先順序佇列,優等級佇列的元素值也是佇列(LIFO或FIFO)。
1.5. 爬取方式
從上一章節可以看出scrapy預設使用LIFO,那麼它的爬取方式是深度優先,如果想使用廣度優先,把SCHEDULER_MEMORY_QUEUE 改為FIFO就可以了。