1. 程式人生 > >scrapy 萬能插入資料庫 sql實現

scrapy 萬能插入資料庫 sql實現

在spider中建立Item 以及對應的操作,row 中的欄位為資料庫表中的欄位,table為表名,在爬蟲啟動的時候初始化資料庫連結,這塊用了scrapy的訊號機制,不瞭解的自己去查。

   class UniversalRow(Item):
    row = Field()
    table = Field()

class BDMonitor(Spider):
    name = "bd"
    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        spider = super(BDMonitor, cls).from_crawler(crawler, *args, **kwargs)
        crawler.signals.connect(spider.spider_opened, signals.spider_opened)



    def spider_opened(self, spider):

        self.data_conn = MySQLConnection(settings['DATA_DB']).get_conn()    





    def compose_item(self, table, item_tuple):
        item = UniversalRow()

        item['table'] = table
        item['row']['hash_label'] = hashlib.md5(item_string.encode('utf8')).hexdigest()
        item['row']['crawl_date'] = int(time.strftime('%Y%m%d', time.localtime()))
        item['row']['source'] = 'bd'
        item['row']['version'] = self.version+1
        return item

接下來是pipelines 中的程式碼


class LoadDBPipeline(object):
  
    def process_item(self, item, spider):

        self.conn = spider.data_conn
        self.dbsession = db(self.conn)

        try:
            with self.conn:
                self.conn.ping(reconnect=True)
                self.dbsession.Insert(item['table'], item['row'])
        except pymysql.Warning as w:
                logging.warning("Insert Warning:%s" % str(w))
        except pymysql.Error as e:
                logging.error("Insert Error:%s" % str(e))
#                 logging.error("Item: %s" % json.dumps(item, ensure_ascii=False))
                
        return item

settings中

ITEM_PIPELINES = {
    'jihuashu.pipelines.LoadDBPipeline': 10
}

最後就是插入資料庫程式碼

class db:

    def __init__(self, conn):
        self.conn = conn
        
    def Insert(self, table, data):
    # insert data (pairs of column and value) into table
        strCol = ''
        strVal = ''
        
        for k in data.keys():
            strCol += ',`' + k + '`'
            if isinstance(data[k], list):
                dataValue = '|'.join(data[k])
            elif isinstance(data[k], dict):
                dataValue = json.dumps(data[k], ensure_ascii=False)
            elif not isinstance(data[k], str):
                dataValue = str(data[k])
            else:
                dataValue = data[k]
            
            strVal += ",'" + self.conn.escape_string(dataValue) + "'"

        qs = "INSERT INTO `%s` (%s) VALUES (%s)"  % (table, strCol[1:], strVal[1:])
        self.conn.query(qs)
            
        return self.conn.insert_id()

初始化傳入連線物件,Insert 中table 為表名,data為資料字典。