1. 程式人生 > >Scrapy騰訊分分cai搭建彩中添加定時執行任務的擴展

Scrapy騰訊分分cai搭建彩中添加定時執行任務的擴展

內部 進行 ret float password gin set asi nec

最近騰訊分分cai搭建彩 dsluntan.com VX:17061863513利用scrapy + redis搭建分布式爬蟲系統,有這樣一個要求: 定時性的針對某一url進行抓取。 當然方法有很多,比如說寫個腳本定時性的向redis中放入url,但最後選擇在scrapy中寫個擴展來實現,這樣耦合性更強,也更方便。

用過scrapy的人都知道scrapy在爬取過程中會間接性的展示爬取的item及request數目,這一功能是通過scrapy內置的擴展logstats來完成的。因此我們不妨看下源碼,看scrapy內部是如何實現這一功能的。源碼如下:

import logging

from twisted.internet import task

from scrapy.exceptions import NotConfigured
from scrapy import signals

logger = logging.getLogger(name)

class LogStats(object):
"""Log basic scraping stats periodically"""

def __init__(self, stats, interval=60.0):
    self.stats = stats
    self.interval = interval
    self.multiplier = 60.0 / self.interval
    self.task = None

@classmethod
def from_crawler(cls, crawler):
    interval = crawler.settings.getfloat(‘LOGSTATS_INTERVAL‘)
    if not interval:
        raise NotConfigured
    o = cls(crawler.stats, interval)
    crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
    crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
    return o

def spider_opened(self, spider):
    self.pagesprev = 0
    self.itemsprev = 0

    self.task = task.LoopingCall(self.log, spider)
    self.task.start(self.interval)

def log(self, spider):
    items = self.stats.get_value(‘item_scraped_count‘, 0)
    pages = self.stats.get_value(‘response_received_count‘, 0)
    irate = (items - self.itemsprev) * self.multiplier
    prate = (pages - self.pagesprev) * self.multiplier
    self.pagesprev, self.itemsprev = pages, items

    msg = ("Crawled %(pages)d pages (at %(pagerate)d pages/min), "
           "scraped %(items)d items (at %(itemrate)d items/min)")
    log_args = {‘pages‘: pages, ‘pagerate‘: prate,
                ‘items‘: items, ‘itemrate‘: irate}
    logger.info(msg, log_args, extra={‘spider‘: spider})

def spider_closed(self, spider, reason):
    if self.task and self.task.running:
        self.task.stop()

可以發現,該擴展接受兩個參數,stats為crawler下的stats,用於實現計數功能,另一個變量為interval,用於表示執行間隔。

關鍵在於這兩行代碼

self.task = task.LoopingCall(self.log, spider)
self.task.start(self.interval)

這裏發現scrapy調用了twisted異步模塊中internet下的task.LoopingCall方法,那麽這段代碼究竟在做什麽呢?查看文檔,開頭寫著Call a function repeatedly.即重復調用一個函數。而LoopingCall是一個類, 接受三個參數。

f: 即將被調用的函數句柄

*a: 位置參數,接受元組形式的傳參

*kw: 關鍵字參數,接受字典形式的傳參

該類中實現了call魔法函數,用於異步形式的調用函數。而上面代碼中的start方法表示多少間隔執行f函數。

OK。確認過眼神,這就是我想要的結果。只要在自己的擴展中使用這兩個函數,那麽就可以實現定時任務的功能。下面是我實現的對某國內大型同×××友網站(♂)定時獲取在線人數的擴展。

from twisted.internet import task
from scrapy import signals
from redis import StrictRedis, ConnectionPool

class ParseOnlineExtension(object):
"""獲取在線人數擴展"""
def init(self, stats, interval, conn):
from numbers import Integral
assert isinstance(interval, Integral) and interval > 0, """
ONLINE_INTERVAL must be a positive integer
"""
self.stats = stats
self.interval = interval
self.task = None
self.conn = conn

@classmethod
def from_crawler(cls, crawler):
    interval = crawler.settings.get(‘ONLINE_INTERVAL‘, 1800)
    pool = ConnectionPool(
        host=crawler.settings.get(‘REDIS_HOST‘, ‘127.0.0.1‘),
        port=crawler.settings.get(‘REDIS_PORT‘, 6379),
        db=0,
        password=crawler.settings.get(‘REDIS_PARAMS‘, {}).get(‘password‘, None)
    )
    conn = StrictRedis(connection_pool=pool)
    o = cls(crawler.stats, interval, conn)
    crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
    crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
    return o

def spider_opened(self, spider):
    self.task = task.LoopingCall(self.next_request, spider)
    self.task.start(self.interval)

def next_request(self, spider):
    self.conn.lpush("bilibili:urls", spider.api_urls.get(‘online‘))

def spider_closed(self, spider, reason):
    if self.task and self.task.running:
        self.task.stop()

總體還是很簡單的。第一次寫博客,有什麽錯誤的地方歡迎交流和指正,謝謝。

Scrapy騰訊分分cai搭建彩中添加定時執行任務的擴展