跨庫資料備份還原、遷移工具
阿新 • • 發佈:2022-03-06
前言
資料開發過程中,為了確保生產資料庫安全,一般將實時資料同步、備份到本地測試資料庫完成開發工作,最後部署應用。
本文實際業務場景:需要從客戶處的生成資料庫(如mysql資料庫)備份資料表到本地的一個sqlserver庫中,將生產資料庫表複製還原過來。為了快速完成資料備份操作,於是我寫了個同步遷移的程式碼。同樣的使用需求下,只需修改配置settings就可以直接用。
當然也可以用資料庫客戶端介面視覺化完成資料庫的備份還原。
測試資料準備
本文還是利用上一篇文章的資料表,該資料存放在mysql資料庫中。
先檢視system_info表結構。
SHOWFULLCOLUMNSFROMsystem_info
程式碼結構
主函式db_backup.py完成資料庫表同庫、垮庫的備份、同步、遷移功能,logging_conf.py日誌記錄模組;settings.py設定兩個資料庫配置引數;tools.py完成路徑獲取和資料庫連線功能。
程式碼部分
-
資料庫備份、遷移-db_backup.py
該模組主要方法:copy_to_from_mysql和mysql_sync_to_sqlserver。
1、copy_to_from_mysql用於把一個表的內容複製到一個檔案。
2、mysql_sync_to_sqlserver從檔案複製資料到插入到目標資料庫表中。
import datetimeimport logging.config from tools import get_local_path,get_conn from logging_conf import log_config,local_data logging.config.dictConfig(log_config) logger = logging.getLogger(__name__) def copy_to_from_mysql(table_name): """ 從Mysql匯出資料檔案到本地 """ start = datetime.datetime.now() full_data_name= get_local_path(table_name) conn = None try: conn = get_conn('SOURCE') if conn is None: raise Exception('獲取資料庫連線失敗') logger.debug(full_data_name) sql = 'Select * from {0}'.format(table_name) with conn.cursor() as cur: cur.execute(sql) number = cur.fetchall() loan_count = 0 for loanNumber in number: tuple_list = tuple([str(i) for i in list(loanNumber)][1:]) loan_count += 1 with open(full_data_name, mode='a', encoding='utf-8') as f: f.write(str(tuple_list) + "\n") f.close() cur.close() print("寫入完成,共寫入%d條資料!" % loan_count) finally: if conn: conn.close() end = datetime.datetime.now() s = (end - start).total_seconds() logger.info('資料匯出: %s, 耗時: %s 秒' % (table_name, s)) return number def mysql_sync_to_sqlserver(table_name): """ 把mysql資料同步到sqlserver資料庫裡面 從本地匯入資料檔案到本地資料庫 :return: """ start = datetime.datetime.now() full_data_name = get_local_path(table_name) conn = None try: conn = get_conn('LOCAL') with conn: # 資料檔案匯入 with conn.cursor() as cur: with open(full_data_name, mode='r', encoding='utf-8') as lst: for line in lst: sql = "insert into system_info values {0}".format(line) print("插入成功",line) cur.execute(sql) conn.commit() finally: if conn: conn.close() end = datetime.datetime.now() s = (end - start).total_seconds() logger.info('資料匯入: %s, 耗時: %s 秒' % (table_name, s)) if __name__ == '__main__': table_name = 'system_info' # 從mysql匯出資料檔案到本地 copy_to_from_mysql(table_name) # 從本地匯入資料檔案到sqlserver資料庫 mysql_sync_to_sqlserver(table_name)
-
資料庫引數配置-settings.py
填寫資料庫備份、還原資料庫相關配置引數
db_param = { "LOCAL": { 'host': 'localhost', 'port': 1433, 'dbname': 'test', 'user': 'xxxx', 'password': 'xxxx', 'DBType': 'SQLServer', 'remark': '本地資料庫', }, "SOURCE": { 'host': 'localhost', 'port':3306, 'dbname': 'mydb', 'user': 'xxxx', 'password': 'xxxx', 'DBType': 'Mysql', 'remark': '目標資料庫', } }
-
日誌記錄模組-logging_conf.py
import os.path import logging.handlers BASE_DIR = r'D:\myProjectfile\database_backup\logs' log_level = logging.DEBUG # 日誌檔案位置 log_home = os.path.join(BASE_DIR, 'log', 'test') if not os.path.exists(log_home): os.makedirs(log_home, exist_ok=True) log_config = { 'version': 1, 'formatters': { 'generic': { 'format': '%(asctime)s %(levelname) -5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s', }, 'simple': { 'format': '%(asctime)s %(levelname) -5.5s %(message)s', }, }, 'handlers': { 'console': { 'class': 'logging.StreamHandler', #輸出到終端 'formatter': 'generic', }, 'file': { 'class': 'logging.FileHandler', #輸出到檔案 'filename': os.path.join(log_home, 'test.log'), 'encoding': 'utf-8', 'formatter': 'generic', }, }, 'root': { 'level': log_level, 'handlers': ['console', 'file'], } } # 資料檔案位置(資料臨時存放位置) local_data = os.path.join(BASE_DIR, 'data') if not os.path.exists(local_data): os.makedirs(local_data, exist_ok=True)
-
資料庫連線和路徑獲取工具-tools.py
import os import pymysql import pymssql from settings import db_param from logging_conf import local_data def get_file_name(table_name, suffix='txt'): """ 返回檔名 """ return table_name.lower() + '.' + suffix def get_local_path(table_name): """ 本地檔案存放路徑 """ path = os.path.join(local_data, table_name) if not os.path.exists(path): os.makedirs(path, exist_ok=True) full_data_name = os.path.join(path, get_file_name(table_name)) return full_data_name def get_conn(sys_code='SOURCE'): """ 資料庫連接獲取,此處給出我常用的三種資料庫連線 """ params = db_param[sys_code] host = params['host'] port = params['port'] database = params['dbname'] user = params['user'] password = params['password'] db_type = params['DBType'].upper() if db_type == "Mysql".upper(): return pymysql.connect(database=database, user=user, password=password, host=host, port=port) elif db_type == "Oracle".upper(): os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' dsn = cx_Oracle.makedsn(host, port, service_name=database) conn = cx_Oracle.connect(user, password, dsn=dsn) return conn elif db_type == 'SQLServer'.upper(): return pymssql.connect(host=host, user=user, password=password, database=database, charset="utf8") else: raise Exception("%s資料庫連線失敗. " % sys_code)
同步完成
轉自:微信公眾號: