RDS一鍵同步到新例項
阿新 • • 發佈:2021-12-16
一.前情提要
因需求要將生產環境多臺RDS給克隆到DEV環境,因為環境在除錯,在生產變動時可能要重新同步一遍,這個調試周期要一個多月,期間主要是環境的部署配置。
阿里雲RDS同區域同步使用DTS操作將很方便,配置DTS的遷移按照教程就能免費又快捷的遷移了。
但當RDS很多,又要不定時去遷移一下(同步要錢),就需要通過阿里雲開放的api來進行批量操作了。
需要使用python3,安裝阿里雲核心庫和rds庫
pip3 install aliyun-python-sdk-core
pip3 install aliyun-python-sdk-rds
注意:
1.需要填寫同步的資訊,當完成前幾個,後面一個失敗,需要註釋掉成功的例項資訊
2.需要加上阿里雲ak和sk
3.指令碼會清空同步例項的庫,不然也沒法同步
二.指令碼
本文版權歸作者所有,歡迎轉載,請務必新增原文連結。#!/bin/python3 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest import ast, sys, pymysql, time, json #填寫資訊主庫和從庫 #例項名,例項ID,要遷移的庫,賬號名,密碼 rds_info = [ [("生產會員", "rm-2zeasdasdsasd", "prod_userdb", "dtscp", "123456"),("dev會員", "rm-2asdsadd", "dev_userdb", "dtscp", "123456")], [("生產支付", "rm-2zfasda", "prod_paydb", "dtscp", "1233456"),("dev支付", "rm-2asdasdasd", "dev_paydb", "dtscp", "123456")], ] #自動變數 dts_id = "" dts_list = [] #資源資訊 aliyun_user_ak = 'Lwewwewew' aliyun_user_sk = 'AEwqewqrwqeq' region_id = 'cn-beijing' client = AcsClient(ak=aliyun_user_ak, secret=aliyun_user_sk, region_id=region_id, timeout=300) #建立DTS def Create_Dts(req): global dts_id global dts_list req.set_action_name('CreateMigrationJob') req.add_query_param('RegionId', "cn-hangzhou") req.add_query_param('Region', "cn-beijing") req.add_query_param('MigrationJobClass', "large") req.add_query_param('ClientToken', "fj2j3kk2jasjd") response = client.do_action(req) false = False true = True response_dict = eval(response) dts_id = response_dict['MigrationJobId'] dts_list.append(dts_id) if response_dict['Success'] == True: print("建立任務成功,ID為: " + dts_id) else: print("建立任務失敗") sys.exit() #驗證是否是空的庫,0為空,1為不空 def Check_Table(cur): sql = "show tables;" cur.execute(sql) table_number = cur.fetchall() if table_number: return 1 else: return 0 def Create_Link(rds_list): #部分拼接 rds_dest_ip = rds_list[1][1] + ".mysql.rds.aliyuncs.com" #目標例項IP db = pymysql.connect(host=rds_dest_ip, port=3306, user=rds_list[1][3], passwd=rds_list[1][4], db=rds_list[1][2], charset='utf8') cursor = db.cursor() #使用cursor方法建立一個遊標 print("開始清理資料庫" + rds_list[1][2]) if Check_Table(cursor) == 0: print(rds_list[1][2] + "庫已經是空的,跳過清理步驟") cursor.close() db.close() else: #這裡輸出drop開頭的刪除表命令,存到元組裡 sql = "show tables ;" cursor.execute(sql) table_bin = cursor.fetchall() cursor.execute('SET foreign_key_checks = 0;') #迴圈執行drop刪除表命令 for i in table_bin: sql = "drop table" + '`' + i[0] + '`' cursor.execute(sql) cursor.execute('SET foreign_key_checks = 1;') #檢測是否清空了 if Check_Table(cursor) == 1: print(rds_list[1][2] + "庫清空失敗") else: print(rds_list[1][2] + "清空成功") #提交操作並關閉連結 db.commit() cursor.close() db.close() def Conf_Dts(req,rds_list): #部分拼接 rds_job_name = rds_list[0][0] + "-》" + rds_list[1][0] #rds例項名 rds_sour_ip = rds_list[0][1] + ".mysql.rds.aliyuncs.com" #源例項IP rds_dest_ip = rds_list[1][1] + ".mysql.rds.aliyuncs.com" #目標例項IP rds_sour_dbname = '\"%s\"' %(rds_list[0][2]) rds_dest_dbname = '\"%s\"' %(rds_list[1][2]) global dts_id print("開始進行" + rds_job_name + "的DTS任務配置") req.set_action_name('ConfigureMigrationJob') req.add_query_param('RegionId', "cn-hangzhou") req.add_query_param('MigrationJobName', rds_job_name) req.add_query_param('SourceEndpoint.InstanceType', "RDS") req.add_query_param('DestinationEndpoint.InstanceType', "RDS") req.add_query_param('MigrationMode.StructureIntialization', "true") req.add_query_param('MigrationMode.DataIntialization', "true") req.add_query_param('MigrationMode.DataSynchronization', "false") req.add_query_param('MigrationObject', "[{ \"DBName\": %s, \"NewDBName\": %s }]" %(rds_sour_dbname, rds_dest_dbname)) req.add_query_param('MigrationJobId', dts_id) req.add_query_param('SourceEndpoint.InstanceID', rds_list[0][1]) req.add_query_param('SourceEndpoint.EngineName', "Mysql") req.add_query_param('SourceEndpoint.Region', "cn-beijing") req.add_query_param('SourceEndpoint.IP', rds_sour_ip) req.add_query_param('SourceEndpoint.Port', "3306") req.add_query_param('SourceEndpoint.UserName', rds_list[0][3]) req.add_query_param('SourceEndpoint.Password', rds_list[0][4]) req.add_query_param('DestinationEndpoint.InstanceID', rds_list[1][1]) req.add_query_param('DestinationEndpoint.EngineName', "Mysql") req.add_query_param('DestinationEndpoint.Region', "cn-beijing") req.add_query_param('DestinationEndpoint.IP', rds_dest_ip) req.add_query_param('DestinationEndpoint.Port', "3306") req.add_query_param('DestinationEndpoint.UserName', rds_list[1][3]) req.add_query_param('DestinationEndpoint.Password', rds_list[1][4]) response = client.do_action(req) print(str(response, encoding = 'utf-8')) def Check_Dts(req, dts_name): req.set_action_name('DescribeMigrationJobStatus') req.add_query_param('RegionId', "cn-hangzhou") req.add_query_param('MigrationJobId', dts_name) response = client.do_action(request) res_dict = json.loads(response) dts_stat = {'NotStarted':'未啟動', 'Prechecking':'預檢查中', 'PrecheckFailed':'預檢查失敗', 'Migrating':'遷移中', 'Suspending':'暫停中', 'MigrationFailed':'遷移失敗', 'Finished':'完成'} print(dts_name + "當前狀態:" + dts_stat[res_dict['MigrationJobStatus']]) #開始執行 for rds_list in rds_info: #初始化 request = CommonRequest() request.set_accept_format('json') request.set_domain('dts.aliyuncs.com') request.set_method('POST') request.set_protocol_type('https') # https | http request.set_version('2020-01-01') Create_Dts(request) #建立dts任務 Create_Link(rds_list) #清理資料庫 Conf_Dts(request, rds_list) #配置DTS資訊 print("") #迴圈顯示狀態 while True: for i in dts_list: Check_Dts(request, i) print("") time.sleep(30)