Python schedule 庫定時任務
阿新 • • 發佈:2021-11-03
Python schedule 庫定時任務
schedule的使用
# 用於scrapy定時任務設定 import schedule import time def job(): print("Do Jod", time.time()) schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) schedule.every(5).to(10).day.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job)
schedule在scrapy的應用
import subprocess, schedule, time, datetime, logging from multiprocessing import Process from scrapy import cmdline def crawl_work(): print("I'm working...") ## cmd = "scrapy crawl NanPing" ## subprocess.Popen(cmd) ## run(cmd, shell=True) ## pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout ## print(pipe.read()) print('-'*100) args = ["scrapy", "crawl", 'it'] while True: start = time.time() p = Process(target=cmdline.execute, args=(args,)) p.start() p.join() logging.debug("### use time: %s" % (time.time() - start)) if __name__=='__main__': print('*'*10+'開始執行定時爬蟲'+'*'*10) schedule.every(1).minutes.do(crawl_work) print('當前時間為{}'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) print('*' * 10 + '定時爬蟲開始執行' + '*' * 10) while True: schedule.run_pending() time.sleep(10)
Sqlalchemy連線postgresql
import os from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker # postgresql+psycopg2://username:password@host/dbname engine = create_engine("postgresql+psycopg2://postgres:88595073@localhost/lecture3") db = scoped_session(sessionmaker(bind=engine)) def main(): result = db.execute("SELECT * FROM table_name").fetchall() for r in result: print(r) if __name__ == "__main__": main()
Scrapy通過連線池連線mysql工具
import pymysql
import traceback
from DBUtils.PooledDB import PooledDB
from scrapy.utils.project import get_project_settings
class MySqlUtil(object):
# 獲取scrapy中settings的資訊
settings = get_project_settings()
config = {
"host": settings.get("MYSQL_HOST"),
"port": settings.get("MYSQL_PORT"),
"database": settings.get("MYSQL_DATABASE"),
"user": settings.get("MYSQL_USER"),
"password": settings.get("MYSQL_PASSWORD"),
"charset": settings.get("MYSQL_CHARSET")
}
"""
MYSQL資料庫物件,負責產生資料庫連線
"""
# 連線池物件
__pool = None
def __init__(self):
self._conn = MysqlUtil.get_conn()
self._cursor = self._conn.cursor()
@staticmethod
def get_conn():
"""
@summary: 靜態方法,從連線池中取出連線
@return: MySQLdb.connection
"""
if MysqlUtil.__pool is None:
__pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host=MysqlUtil.config['host'], port=MysqlUtil.config['port'], user=MysqlUtil.config['user'], passwd=MysqlUtil.config['password'], db=MysqlUtil.config['database'], charset=MysqlUtil.config['charset']))
return __pool.connection()
def get_all(self, sql, param=None):
"""
@summary: 執行查詢, 並返回所有結果集
@param sql: 查詢sql,如果有查詢條件,請指定引數列表,並使用[param]傳入
@param param: 可選引數,條件列表值(元組/列表)
@return: result list(字典物件)/boolean 查詢到的結果集
"""
try:
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchall()
else:
result = False
return result
except Exception as e:
traceback.print_exc(e)
def get_one(self, sql, param=None):
"""
@summary: 執行查詢, 並返回所有結果集
@param sql: 查詢sql,如果有查詢條件,請指定引數列表,並使用[param]傳入
@param param: 可選引數,條件列表值(元組/列表)
@return: result list/boolean 查詢到的結果集
"""
try:
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchone()
else:
result = False
return result
except Exception as e:
traceback.print_exc(e)
def get_count(self, sql, param=None):
"""
@summary: 執行查詢, 並返回所有結果集
@param sql: 查詢sql,如果有查詢條件,請指定引數列表,並使用[param]傳入
@param param: 可選引數,條件列表值(元組/列表)
@return: result list/boolean 查詢到的結果集
"""
try:
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
return count
except Exception as e:
traceback.print_exc(e)
def get_many(self, sql, num, param=None):
"""
@summary: 執行查詢,並取出num條結果
@param sql:查詢sql,如果有查詢條件,請只指定條件列表,並將條件值使用引數[param]傳遞進來
@param num:取得的結果條數
@param param: 可選引數,條件列表值(元組/列表)
@return: result list/boolean 查詢到的結果集
"""
try:
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchmany(num)
else:
result = False
return result
except Exception as e:
traceback.print_exc(e)
def insert_one(self, sql, value):
"""
@summary: 向資料表插入一條記錄
@param sql:要插入的sql格式
@param value:要插入的記錄資料tuple/list
@return: insertId 受影響的行數
"""
try:
row_count = self._cursor.execute(sql, value)
return row_count
except Exception as e:
traceback.print_exc(e)
self.end("rollback")
def insert_many(self, sql, values):
"""
@summary: 向資料表插入多條記錄
@param sql:要插入的sql格式
@param values:要插入的記錄資料tuple(tuple)/list[list]
@return: count 受影響的行數
"""
try:
row_count = self._cursor.executemany(sql, values)
return row_count
except Exception as e:
traceback.print_exc(e)
self.end("rollback")
def __query(self, sql, param=None):
try:
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
return count
except Exception as e:
traceback.print_exc(e)
def update(self, sql, param=None):
"""
@summary: 更新資料表記錄
@param sql: sql格式及條件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影響的行數
"""
return self.__query(sql, param)
def delete(self, sql, param=None):
"""
@summary: 刪除資料表記錄
@param sql: sql格式及條件,使用(%s,%s)
@param param: 要刪除的條件 值 tuple/list
@return: count 受影響的行數
"""
return self.__query(sql, param)
def begin(self):
"""
@summary: 開啟事務
"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""
@summary: 結束事務
"""
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
def dispose(self, is_end=1):
"""
@summary: 釋放連線池資源
"""
if is_end == 1:
self.end('commit')
else:
self.end('rollback')
self._cursor.close()
self._conn.close()
# 呼叫 pipeline
from torrentSpider.utils.db_util import MysqlUtil
import traceback
import logging
class MySqlPipeline(object):
pool = None
def __init__(self):
pass
# 開啟爬蟲時連結資料庫
def open_spider(self, spider):
self.pool = MysqlUtil()
# 處理
def process_item(self, item, spider):
try:
# 執行sql語句
# sql = "select * from torrent_ye"
# count = self.pool.get_all(sql, None)
# print('查詢數量為:' + str(count))
# 先去資料庫查詢,查到了就不入庫了
sql_select = """select count(1) from torrent_ye where torrent_url = %(torrent_url)s"""
params_select = {'torrent_url': item['torrent_url']}
flag = self.pool.get_count(sql_select, params_select)
if flag > 0:
logging.info('記錄已經存在:[%s][%s]', item['torrent_title'], item['torrent_url'])
return
sql_insert = """insert into torrent_ye(torrent_title, torrent_name, torrent_director,
torrent_actor, torrent_language, torrent_type, torrent_region, torrent_update_time,
torrent_status, torrent_show_time, torrent_introduction, torrent_url) values
(%(torrent_title)s,%(torrent_name)s,%(torrent_director)s,%(torrent_actor)s,%(torrent_language)s,
%(torrent_type)s,%(torrent_region)s,%(torrent_update_time)s,%(torrent_status)s,%(torrent_show_time)s,%(torrent_introduction)s,%(torrent_url)s)"""
params = {'torrent_title': item['torrent_title'], 'torrent_name': item['torrent_name'],
'torrent_director': item['torrent_director'], 'torrent_actor': item['torrent_actor'],
'torrent_language': item['torrent_language'], 'torrent_type': item['torrent_type'],
'torrent_region': item['torrent_region'], 'torrent_update_time': item['torrent_update_time'],
'torrent_status': item['torrent_status'], 'torrent_show_time': item['torrent_show_time'],
'torrent_introduction': item['torrent_introduction'], 'torrent_url': item['torrent_url']}
self.pool.insert_one(sql_insert, params)
self.pool.end("commit")
except Exception as e:
logging.error('發生異常:[%s]', e)
traceback.print_exc(e)
self.pool.end("rollback")
# 結束
def close_spider(self, spider):
pass