mysql 通用造資料指令碼
背景
公司研發了一個系統——基於客戶交易行為,利用演算法得出各個使用者的各式各樣的特徵,暫且稱之為“標籤”。這些“標籤”是利用大資料平臺計算而得的結果。計算得到的近三四百個“標籤“資料會會落到中臺N張表中。操作員,在管理平臺,能夠通過特定”標籤“獲取某類滿足此”標籤“的使用者,或者搜尋某個使用者的賬戶,查詢該使用者的”標籤“。
在測試過程中,沒有資料是一個很苦惱的事情。手工插入資料,三四百個欄位,插入十條後,人也癱了。於是想:可否用一個指令碼實現。
分析
分析下功能:
1、上述兩個場景中,不管是查滿足某”標籤“的使用者還是查某使用者的”標籤“都是通過條件去資料庫取資料即可,不需要對資料進行處理(在大資料層已經處理好了)。那麼,資料間不存在業務邏輯。
2、如何確定插入資料的值?可這麼處理:通過desc table 來獲取該表的各個欄位,以及欄位型別,欄位長度。根據這些資訊,可造符合條件的資料。
實現
上面確認了可行性。接下來就是實現了。
建立一個Generate_Data類,寫一個方法,獲取表字段相關資訊。如下:
class Generate_Data:
def get_cols(self,table):
"""獲取表字段名、欄位型別"""
sql = "desc " + table
cur.execute(sql)
result = cur.fetchall()
return result
在sql中結果是:
獲取到資料後,解析上述type列的欄位,得到欄位型別,欄位長度(像date就沒有欄位長度標識)。比如解析上述 fund_acccount 得到型別:varchar,長度:32。對於數值型的資料 有可能還有正負之分。比如tinyint 型別:若無正負,即unsined ,範圍為:[0,255];若有正負之分,範圍為:[-128,127]。
for col in columns:
col_name = col[0]
col_type_info = col[1]
index = int(col_type_info.rfind("(")) # 通過左右括號獲取括號內的資料
# 處理無長度欄位 eg:date
if index == -1:
col_type = col_type_info
else:
col_type = col_type_info[:index]
# 處理數值型別有符號與無符號
if "unsigned" in col_type_info:
is_unsigned = 1
else:
is_unsigned = 2
接下來是造符合各欄位的資料(以下為常用的一些型別作了處理)
# 對於不同型別的欄位,生成資料值的格式、內容等處理
# char,varchar
if "char" in col_type:
col_length = int(col_type_info[index + 1:-1]) # 獲取欄位允許長度
if len(col_name) > (col_length-6):
if col_length < 6:
value = str(random.randint(0, 9*10**col_length))
else:
value = str(random.randint(0, random_range))
else:
value = col_name + str(random.randint(0, random_range))
insert_sql = "{0}'{1}',".format(insert_sql, value)
elif col_type == "tinyint":
# 編碼為gbk時,每個字元最多佔2個位元組;編碼為utf8時,每個字元最多佔3個位元組;此處以utf8編碼佔用空間計算,下同
random_range = 255//(3*1*is_unsigned)
value = random.randint(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%d,") % value
elif col_type == "smallint":
random_range = 65535//(3*2*is_unsigned) # 除考慮編碼外,每個數字佔用2個位元組
value = random.randint(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%d,") % value
elif col_type == "mediumint":
random_range = 65535//(3*2*is_unsigned) # 除考慮編碼外,每個數字佔用2個位元組
value = random.randint(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%d,") % value
# int,integer,bigint
elif "int" in col_type:
value = random.randint(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%d,") % value
elif col_type in ["decimal", "double", "numeric", "real","float"]:
value = random.uniform(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%f,") % value
elif col_type == "date":
value = get_date(random.randint(-300, 0))
insert_sql = "{0}'{1}',".format(insert_sql, value)
elif col_type == "time":
value = get_time()
insert_sql = "{0}'{1}',".format(insert_sql, value)
# timestamp,datetime
elif "time" in col_type:
value = get_date(random.randint(-300, 0)) + " " + get_time()
insert_sql = "{0}'{1}',".format(insert_sql, value)
elif col_type == "year":
value = str(random.randint(1901, 2155))
insert_sql = "{0}'{1}',".format(insert_sql, value)
以上為第一版程式碼的,之後對程式碼稍加改動,支援表關聯造資料。當A表中有一個欄位為fund_account,B表中有一個欄位為fund_account。關聯後,A表中的fund_account在B表中都能找到對應的資料。此外還支援一種情況:當A表中還有一個欄位clinet_name 欄位值與fund_account一致時,可通過配置造出A表中fund_account和clinet_name 與 B表中的fund_account都一致的資料。
指令碼執行後的資料結果如下圖:
![05251](https://img-blog.csdn.net/2018052510202195?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NrMzIwNw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
全部程式碼如下
# -*- coding: utf-8 -*-
__author__ = "chenk"
from connect_to_mysql import Connect_mysql
import random,datetime,time
class Generate_Data:
def get_cols(self,table):
"""獲取表字段名、欄位型別"""
sql = "desc " + table
cur.execute(sql)
result = cur.fetchall()
return result
def generate_data_for_mysqldb(self,columns,commit_num,commit_times):
"""通用造資料函式
columns 是表的所有欄位元祖,
commit_num 是每次插入資料庫的數量,
commit_times*commit_num是插入的總資料量"""
global linkfield
commit_num_temp = commit_num
linkfield_index = 0
while commit_times:
sql = "" # 被執行的sql
sql_value = "" # 資料組集
commit_num = commit_num_temp
while commit_num:
insert_sql = "" # 單組資料
for col in columns:
col_name = col[0]
col_type_info = col[1]
index = int(col_type_info.rfind("(")) # 通過左右括號獲取括號內的資料
# linkfield["pass_colunm"] 中的欄位是一個標識,可判斷防止獲取linkfield中產生的資料
if linkfield.get(col_name):
if not (table+col_name) in linkfield["pass_colunm"]:
insert_sql = "{0}'{1}',".format(insert_sql, linkfield[col_name][linkfield_index])
linkfield_index += 1
continue
# 處理數值型別有符號與無符號
if "unsigned" in col_type_info:
is_unsigned = 1
else:
is_unsigned = 2
# 處理無長度欄位 eg:date
if index == -1:
col_type = col_type_info
else:
col_type = col_type_info[:index]
# 數值型別根據有/無符號,變更輸入資料的區間
g = lambda x, y: -y - 1 if x == 2 else 0
# 處理欄位長度
random_range = 999999
# 對於不同型別的欄位,生成資料值的格式、內容等處理
# char,varchar
if "char" in col_type:
col_length = int(col_type_info[index + 1:-1]) # 獲取欄位允許長度
if len(col_name) > (col_length-6):
if col_length < 6:
value = str(random.randint(0, 9*10**col_length))
else:
value = str(random.randint(0, random_range))
else:
value = col_name + str(random.randint(0, random_range))
insert_sql = "{0}'{1}',".format(insert_sql, value)
elif col_type == "tinyint":
# 編碼為gbk時,每個字元最多佔2個位元組;編碼為utf8時,每個字元最多佔3個位元組;此處以utf8編碼佔用空間計算,下同
random_range = 255//(3*1*is_unsigned)
value = random.randint(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%d,") % value
elif col_type == "smallint":
random_range = 65535//(3*2*is_unsigned) # 除考慮編碼外,每個數字佔用2個位元組
value = random.randint(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%d,") % value
elif col_type == "mediumint":
random_range = 65535//(3*2*is_unsigned) # 除考慮編碼外,每個數字佔用2個位元組
value = random.randint(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%d,") % value
# int,integer,bigint
elif "int" in col_type:
value = random.randint(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%d,") % value
elif col_type in ["decimal", "double", "numeric", "real","float"]:
value = random.uniform(g(is_unsigned,random_range), random_range)
insert_sql = (insert_sql + "%f,") % value
elif col_type == "date":
value = get_date(random.randint(-300, 0))
insert_sql = "{0}'{1}',".format(insert_sql, value)
elif col_type == "time":
value = get_time()
insert_sql = "{0}'{1}',".format(insert_sql, value)
# timestamp,datetime
elif "time" in col_type:
value = get_date(random.randint(-300, 0)) + " " + get_time()
insert_sql = "{0}'{1}',".format(insert_sql, value)
elif col_type == "year":
value = str(random.randint(1901, 2155))
insert_sql = "{0}'{1}',".format(insert_sql, value)
# 判斷欄位是否在配置 linkfield_judge中,若是,則需儲存資料值
if col_name in linkfield_judge.keys():
if not linkfield.get(col_name):
linkfield[col_name] = list()
linkfield[col_name].append(value)
# 處理同一張表中多個欄位值相同的情況
for k, w in linkfield_judge.items():
if k != col_name and w == linkfield_judge[col_name]:
if not linkfield.get(k):
linkfield[k] = list()
linkfield[k].append(value)
# 對於正常生成資料的欄位,放在過濾欄位列表中,以作標識
if not table+col_name in linkfield["pass_colunm"]:
linkfield["pass_colunm"].append(table+col_name)
sql_value += "({0}),".format(insert_sql[:-1])
commit_num -= 1
sql += "insert into {0} values {1};".format(table, sql_value[:-1])
# 執行拼接的SQL
try:
cur.execute(sql)
conn.commit()
print("Insert into {0} successfully!".format(table))
except:
print("SQL ERROR:{0}".format(sql))
commit_times -= 1
def get_date(num=0):
"""獲取今日日期"""
if num == 0:
return datetime.date.today().strftime("%Y%m%d")
else:
return (datetime.date.today() + datetime.timedelta(days=num)).strftime("%Y-%m-%d")
def get_time():
"""獲取今日日期"""
return time.strftime("%H:%M:%S")
if __name__ == "__main__":
# 連線資料庫
connect_mysql = Connect_mysql()
mysql_config = connect_mysql.get_config("mysql_config.json")
conn, cur = connect_mysql.conn_mysql(host=mysql_config["localhost_cf_test"]["host"],
port=mysql_config["localhost_cf_test"]["port"], \
user=mysql_config["localhost_cf_test"]["user"],
password=mysql_config["localhost_cf_test"]["password"], \
database=mysql_config["localhost_cf_test"]["database"],
charset=mysql_config["localhost_cf_test"]["charset"])
generate_data = Generate_Data()
tables = ["portrait_all","portrait_fund"] # 配置需要插入資料的表名
# 配置各表關聯欄位的關係,值相同的鍵為關聯欄位 eg: linkfield_judge = {"fund_account":0,"client_id":0, "client_name":1}
# 下述配置的意思為 配置的表中若有fund_account 與 client_id,各表中欄位fund_accout、client_id都做關聯;
# 各表的client_name值都會關聯。
linkfield_judge = {"fund_account":0,"client_name":0}
# 例如:linkfield: {'pass_colunm':['portrait_allfund_account'],'client_name':['fund_account91', 'fund_account01'}
linkfield = {"pass_colunm":[]} # 儲存關聯欄位的值
for table in tables:
# print("linkfield:",linkfield)
cols = generate_data.get_cols(table) # 獲取表字段
generate_data.generate_data_for_mysqldb(columns=cols,commit_num=10,commit_times=5) # 生成資料
以上程式碼執行時需要配置的是:
tables = ["portrait_all","portrait_fund"] # 配置需要插入資料的表名
linkfield_judge = {"fund_account":0,"client_name":0}
以及下述 commit_num 每次提交插入資料的sql值的個數;每張表中造的資料總是=commit_num*commit_times
generate_data.generate_data_for_mysqldb(columns=cols,commit_num=10,commit_times=5) # 生成資料
有關 from connect_to_mysql import Connect_mysql
這個方法以及相關配置可查閱此文章:https://blog.csdn.net/ck3207/article/details/80233204