1. 程式人生 > 其它 >跨庫資料備份還原、遷移工具

跨庫資料備份還原、遷移工具

前言

資料開發過程中,為了確保生產資料庫安全,一般將實時資料同步、備份到本地測試資料庫完成開發工作,最後部署應用。

本文實際業務場景:需要從客戶處的生成資料庫(如mysql資料庫)備份資料表到本地的一個sqlserver庫中,將生產資料庫表複製還原過來。為了快速完成資料備份操作,於是我寫了個同步遷移的程式碼。同樣的使用需求下,只需修改配置settings就可以直接用。

當然也可以用資料庫客戶端介面視覺化完成資料庫的備份還原。

測試資料準備

本文還是利用上一篇文章的資料表,該資料存放在mysql資料庫中。

Python搭建一個系統資訊實時監控資料視覺化大屏

先檢視system_info表結構。

SHOWFULLCOLUMNSFROMsystem_info

程式碼結構

主函式db_backup.py完成資料庫表同庫、垮庫的備份、同步、遷移功能,logging_conf.py日誌記錄模組;settings.py設定兩個資料庫配置引數;tools.py完成路徑獲取和資料庫連線功能。

程式碼部分

  • 資料庫備份、遷移-db_backup.py

該模組主要方法:copy_to_from_mysql和mysql_sync_to_sqlserver。

1、copy_to_from_mysql用於把一個表的內容複製到一個檔案。

2、mysql_sync_to_sqlserver從檔案複製資料到插入到目標資料庫表中。

import datetime
import logging.config from tools import get_local_path,get_conn from logging_conf import log_config,local_data logging.config.dictConfig(log_config) logger = logging.getLogger(__name__) def copy_to_from_mysql(table_name): """ 從Mysql匯出資料檔案到本地 """ start = datetime.datetime.now() full_data_name
= get_local_path(table_name) conn = None try: conn = get_conn('SOURCE') if conn is None: raise Exception('獲取資料庫連線失敗') logger.debug(full_data_name) sql = 'Select * from {0}'.format(table_name) with conn.cursor() as cur: cur.execute(sql) number = cur.fetchall() loan_count = 0 for loanNumber in number: tuple_list = tuple([str(i) for i in list(loanNumber)][1:]) loan_count += 1 with open(full_data_name, mode='a', encoding='utf-8') as f: f.write(str(tuple_list) + "\n") f.close() cur.close() print("寫入完成,共寫入%d條資料!" % loan_count) finally: if conn: conn.close() end = datetime.datetime.now() s = (end - start).total_seconds() logger.info('資料匯出: %s, 耗時: %s 秒' % (table_name, s)) return number def mysql_sync_to_sqlserver(table_name): """ 把mysql資料同步到sqlserver資料庫裡面 從本地匯入資料檔案到本地資料庫 :return: """ start = datetime.datetime.now() full_data_name = get_local_path(table_name) conn = None try: conn = get_conn('LOCAL') with conn: # 資料檔案匯入 with conn.cursor() as cur: with open(full_data_name, mode='r', encoding='utf-8') as lst: for line in lst: sql = "insert into system_info values {0}".format(line) print("插入成功",line) cur.execute(sql) conn.commit() finally: if conn: conn.close() end = datetime.datetime.now() s = (end - start).total_seconds() logger.info('資料匯入: %s, 耗時: %s 秒' % (table_name, s)) if __name__ == '__main__': table_name = 'system_info' # 從mysql匯出資料檔案到本地 copy_to_from_mysql(table_name) # 從本地匯入資料檔案到sqlserver資料庫 mysql_sync_to_sqlserver(table_name)
  • 資料庫引數配置-settings.py

    填寫資料庫備份、還原資料庫相關配置引數

db_param = {
    "LOCAL": {
        'host': 'localhost',
        'port': 1433,
        'dbname': 'test',
        'user': 'xxxx',
        'password': 'xxxx',
        'DBType': 'SQLServer',
        'remark': '本地資料庫',
    },
    "SOURCE": {
         'host': 'localhost',
        'port':3306,
        'dbname': 'mydb',
        'user': 'xxxx',
        'password': 'xxxx',
        'DBType': 'Mysql',
        'remark': '目標資料庫',
    }
}
  • 日誌記錄模組-logging_conf.py

import os.path
import logging.handlers

BASE_DIR = r'D:\myProjectfile\database_backup\logs'

log_level = logging.DEBUG

# 日誌檔案位置
log_home = os.path.join(BASE_DIR, 'log', 'test')
if not os.path.exists(log_home):
    os.makedirs(log_home, exist_ok=True)

log_config = {
    'version': 1,
    'formatters': {
        'generic': {
            'format': '%(asctime)s %(levelname) -5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s',
        },
        'simple': {
            'format': '%(asctime)s %(levelname) -5.5s %(message)s',
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',   #輸出到終端
            'formatter': 'generic',
        },
        'file': {
            'class': 'logging.FileHandler',  #輸出到檔案
            'filename': os.path.join(log_home, 'test.log'),
            'encoding': 'utf-8',
            'formatter': 'generic',

        },
    },
    'root': {
        'level': log_level,
        'handlers': ['console', 'file'],
    }
}

# 資料檔案位置(資料臨時存放位置)
local_data = os.path.join(BASE_DIR, 'data')
if not os.path.exists(local_data):
    os.makedirs(local_data, exist_ok=True)
  • 資料庫連線和路徑獲取工具-tools.py

import os
import pymysql
import pymssql
from settings import db_param
from logging_conf import  local_data

def get_file_name(table_name, suffix='txt'):
    """
    返回檔名
    """
    return table_name.lower() + '.' + suffix

def get_local_path(table_name):
    """
    本地檔案存放路徑
    """
    path = os.path.join(local_data, table_name)
    if not os.path.exists(path):
        os.makedirs(path, exist_ok=True)
    full_data_name = os.path.join(path, get_file_name(table_name))
    return full_data_name

def get_conn(sys_code='SOURCE'):
    """
    資料庫連接獲取,此處給出我常用的三種資料庫連線
    """
    params = db_param[sys_code]
    host = params['host']
    port = params['port']
    database = params['dbname']
    user = params['user']
    password = params['password']
    db_type = params['DBType'].upper()

    if db_type == "Mysql".upper():
        return pymysql.connect(database=database, user=user, password=password, host=host, port=port)
    elif db_type == "Oracle".upper():
        os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
        dsn = cx_Oracle.makedsn(host, port, service_name=database)
        conn = cx_Oracle.connect(user, password, dsn=dsn)
        return conn
    elif db_type == 'SQLServer'.upper():
        return pymssql.connect(host=host, user=user, password=password, database=database, charset="utf8")
    else:
        raise Exception("%s資料庫連線失敗. " % sys_code)
同步完成

轉自:微信公眾號:Python資料分析例項

原文:https://mp.weixin.qq.com/s?__biz=MzI0NzY2MDA4MA==&mid=2247498197&idx=1&sn=4d0ef699f48482e48c1ed2c111efad3c&scene=21#wechat_redirect