1. 程式人生 > >mysql 通用造資料指令碼

mysql 通用造資料指令碼

背景

公司研發了一個系統——基於客戶交易行為,利用演算法得出各個使用者的各式各樣的特徵,暫且稱之為“標籤”。這些“標籤”是利用大資料平臺計算而得的結果。計算得到的近三四百個“標籤“資料會會落到中臺N張表中。操作員,在管理平臺,能夠通過特定”標籤“獲取某類滿足此”標籤“的使用者,或者搜尋某個使用者的賬戶,查詢該使用者的”標籤“。
在測試過程中,沒有資料是一個很苦惱的事情。手工插入資料,三四百個欄位,插入十條後,人也癱了。於是想:可否用一個指令碼實現。

分析

分析下功能:
1、上述兩個場景中,不管是查滿足某”標籤“的使用者還是查某使用者的”標籤“都是通過條件去資料庫取資料即可,不需要對資料進行處理(在大資料層已經處理好了)。那麼,資料間不存在業務邏輯。
2、如何確定插入資料的值?可這麼處理:通過desc table 來獲取該表的各個欄位,以及欄位型別,欄位長度。根據這些資訊,可造符合條件的資料。

實現

上面確認了可行性。接下來就是實現了。
建立一個Generate_Data類,寫一個方法,獲取表字段相關資訊。如下:

class Generate_Data:
    def get_cols(self,table):
         """獲取表字段名、欄位型別"""
         sql = "desc " + table
         cur.execute(sql)
         result = cur.fetchall()

         return result

在sql中結果是:
05241

獲取到資料後,解析上述type列的欄位,得到欄位型別,欄位長度(像date就沒有欄位長度標識)。比如解析上述 fund_acccount 得到型別:varchar,長度:32。對於數值型的資料 有可能還有正負之分。比如tinyint 型別:若無正負,即unsined ,範圍為:[0,255];若有正負之分,範圍為:[-128,127]。

                 for col in columns:
                     col_name = col[0]
                     col_type_info = col[1]
                     index = int(col_type_info.rfind("("))  # 通過左右括號獲取括號內的資料

                     # 處理無長度欄位 eg:date
                     if index == -1:
                         col_type
= col_type_info else: col_type = col_type_info[:index] # 處理數值型別有符號與無符號 if "unsigned" in col_type_info: is_unsigned = 1 else: is_unsigned = 2 接下來是造符合各欄位的資料(以下為常用的一些型別作了處理)
                 # 對於不同型別的欄位,生成資料值的格式、內容等處理
                 # char,varchar
                 if "char" in col_type:
                     col_length = int(col_type_info[index + 1:-1])  # 獲取欄位允許長度
                     if len(col_name) > (col_length-6):
                         if col_length < 6:
                            value = str(random.randint(0, 9*10**col_length))
                         else:
                             value = str(random.randint(0, random_range))
                     else:
                         value = col_name + str(random.randint(0, random_range))
                     insert_sql = "{0}'{1}',".format(insert_sql, value)
                 elif col_type == "tinyint":
                     # 編碼為gbk時,每個字元最多佔2個位元組;編碼為utf8時,每個字元最多佔3個位元組;此處以utf8編碼佔用空間計算,下同
                     random_range = 255//(3*1*is_unsigned)
                     value = random.randint(g(is_unsigned,random_range), random_range)
                     insert_sql = (insert_sql + "%d,") % value
                 elif col_type == "smallint":
                     random_range = 65535//(3*2*is_unsigned)    # 除考慮編碼外,每個數字佔用2個位元組
                     value = random.randint(g(is_unsigned,random_range), random_range)
                     insert_sql = (insert_sql + "%d,") % value
                 elif col_type == "mediumint":
                     random_range = 65535//(3*2*is_unsigned)    # 除考慮編碼外,每個數字佔用2個位元組
                     value = random.randint(g(is_unsigned,random_range), random_range)
                     insert_sql = (insert_sql + "%d,") % value
                 # int,integer,bigint
                 elif "int" in col_type:
                     value = random.randint(g(is_unsigned,random_range), random_range)
                     insert_sql = (insert_sql + "%d,") % value
                 elif col_type in ["decimal", "double", "numeric", "real","float"]:
                     value = random.uniform(g(is_unsigned,random_range), random_range)
                     insert_sql = (insert_sql + "%f,") % value
                 elif col_type == "date":
                     value = get_date(random.randint(-300, 0))
                     insert_sql = "{0}'{1}',".format(insert_sql, value)
                 elif col_type == "time":
                     value = get_time()
                     insert_sql = "{0}'{1}',".format(insert_sql, value)
                 # timestamp,datetime
                 elif "time" in col_type:
                     value = get_date(random.randint(-300, 0)) + " " + get_time()
                     insert_sql = "{0}'{1}',".format(insert_sql, value)
                 elif col_type == "year":
                     value = str(random.randint(1901, 2155))
                     insert_sql = "{0}'{1}',".format(insert_sql, value)
以上為第一版程式碼的,之後對程式碼稍加改動,支援表關聯造資料。當A表中有一個欄位為fund_account,B表中有一個欄位為fund_account。關聯後,A表中的fund_account在B表中都能找到對應的資料。此外還支援一種情況:當A表中還有一個欄位clinet_name  欄位值與fund_account一致時,可通過配置造出A表中fund_account和clinet_name 與 B表中的fund_account都一致的資料。
指令碼執行後的資料結果如下圖:
![05251](https://img-blog.csdn.net/2018052510202195?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2NrMzIwNw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)

全部程式碼如下

# -*- coding: utf-8 -*-
__author__ = "chenk"
from connect_to_mysql import Connect_mysql
import random,datetime,time

class Generate_Data:
    def get_cols(self,table):
         """獲取表字段名、欄位型別"""
         sql = "desc " + table
         cur.execute(sql)
         result = cur.fetchall()

         return result

    def generate_data_for_mysqldb(self,columns,commit_num,commit_times):
         """通用造資料函式
         columns 是表的所有欄位元祖,
         commit_num 是每次插入資料庫的數量,
         commit_times*commit_num是插入的總資料量"""
         global linkfield
         commit_num_temp = commit_num
         linkfield_index = 0
         while commit_times:
             sql = ""   # 被執行的sql
             sql_value = "" # 資料組集
             commit_num = commit_num_temp
             while commit_num:
                 insert_sql = ""    # 單組資料
                 for col in columns:
                     col_name = col[0]
                     col_type_info = col[1]
                     index = int(col_type_info.rfind("("))  # 通過左右括號獲取括號內的資料

                     # linkfield["pass_colunm"] 中的欄位是一個標識,可判斷防止獲取linkfield中產生的資料
                     if linkfield.get(col_name):
                         if not (table+col_name) in linkfield["pass_colunm"]:
                            insert_sql = "{0}'{1}',".format(insert_sql, linkfield[col_name][linkfield_index])
                            linkfield_index += 1
                            continue

                     # 處理數值型別有符號與無符號
                     if "unsigned" in col_type_info:
                         is_unsigned = 1
                     else:
                         is_unsigned = 2

                     # 處理無長度欄位 eg:date
                     if index == -1:
                         col_type = col_type_info
                     else:
                         col_type = col_type_info[:index]

                     # 數值型別根據有/無符號,變更輸入資料的區間
                     g = lambda x, y: -y - 1 if x == 2 else 0

                     # 處理欄位長度
                     random_range = 999999

                     # 對於不同型別的欄位,生成資料值的格式、內容等處理
                     # char,varchar
                     if "char" in col_type:
                         col_length = int(col_type_info[index + 1:-1])  # 獲取欄位允許長度
                         if len(col_name) > (col_length-6):
                             if col_length < 6:
                                value = str(random.randint(0, 9*10**col_length))
                             else:
                                 value = str(random.randint(0, random_range))
                         else:
                             value = col_name + str(random.randint(0, random_range))
                         insert_sql = "{0}'{1}',".format(insert_sql, value)
                     elif col_type == "tinyint":
                         # 編碼為gbk時,每個字元最多佔2個位元組;編碼為utf8時,每個字元最多佔3個位元組;此處以utf8編碼佔用空間計算,下同
                         random_range = 255//(3*1*is_unsigned)
                         value = random.randint(g(is_unsigned,random_range), random_range)
                         insert_sql = (insert_sql + "%d,") % value
                     elif col_type == "smallint":
                         random_range = 65535//(3*2*is_unsigned)    # 除考慮編碼外,每個數字佔用2個位元組
                         value = random.randint(g(is_unsigned,random_range), random_range)
                         insert_sql = (insert_sql + "%d,") % value
                     elif col_type == "mediumint":
                         random_range = 65535//(3*2*is_unsigned)    # 除考慮編碼外,每個數字佔用2個位元組
                         value = random.randint(g(is_unsigned,random_range), random_range)
                         insert_sql = (insert_sql + "%d,") % value
                     # int,integer,bigint
                     elif "int" in col_type:
                         value = random.randint(g(is_unsigned,random_range), random_range)
                         insert_sql = (insert_sql + "%d,") % value
                     elif col_type in ["decimal", "double", "numeric", "real","float"]:
                         value = random.uniform(g(is_unsigned,random_range), random_range)
                         insert_sql = (insert_sql + "%f,") % value
                     elif col_type == "date":
                         value = get_date(random.randint(-300, 0))
                         insert_sql = "{0}'{1}',".format(insert_sql, value)
                     elif col_type == "time":
                         value = get_time()
                         insert_sql = "{0}'{1}',".format(insert_sql, value)
                     # timestamp,datetime
                     elif "time" in col_type:
                         value = get_date(random.randint(-300, 0)) + " " + get_time()
                         insert_sql = "{0}'{1}',".format(insert_sql, value)
                     elif col_type == "year":
                         value = str(random.randint(1901, 2155))
                         insert_sql = "{0}'{1}',".format(insert_sql, value)

                     # 判斷欄位是否在配置 linkfield_judge中,若是,則需儲存資料值
                     if col_name in linkfield_judge.keys():
                         if not linkfield.get(col_name):
                             linkfield[col_name] = list()
                         linkfield[col_name].append(value)

                         # 處理同一張表中多個欄位值相同的情況
                         for k, w in linkfield_judge.items():
                             if k != col_name and w == linkfield_judge[col_name]:
                                 if not linkfield.get(k):
                                     linkfield[k] = list()
                                 linkfield[k].append(value)
                                 # 對於正常生成資料的欄位,放在過濾欄位列表中,以作標識
                                 if not table+col_name in linkfield["pass_colunm"]:
                                    linkfield["pass_colunm"].append(table+col_name)

                 sql_value += "({0}),".format(insert_sql[:-1])
                 commit_num -= 1

             sql += "insert into {0} values {1};".format(table, sql_value[:-1])

             # 執行拼接的SQL
             try:
                 cur.execute(sql)
                 conn.commit()
                 print("Insert into {0} successfully!".format(table))
             except:
                 print("SQL ERROR:{0}".format(sql))
             commit_times -= 1



def get_date(num=0):
    """獲取今日日期"""
    if num == 0:
        return datetime.date.today().strftime("%Y%m%d")
    else:
        return (datetime.date.today() + datetime.timedelta(days=num)).strftime("%Y-%m-%d")

def get_time():
    """獲取今日日期"""
    return time.strftime("%H:%M:%S")


if __name__ == "__main__":
    # 連線資料庫
    connect_mysql = Connect_mysql()
    mysql_config = connect_mysql.get_config("mysql_config.json")
    conn, cur = connect_mysql.conn_mysql(host=mysql_config["localhost_cf_test"]["host"],
                                         port=mysql_config["localhost_cf_test"]["port"], \
                                         user=mysql_config["localhost_cf_test"]["user"],
                                         password=mysql_config["localhost_cf_test"]["password"], \
                                         database=mysql_config["localhost_cf_test"]["database"],
                                         charset=mysql_config["localhost_cf_test"]["charset"])

    generate_data = Generate_Data()
    tables = ["portrait_all","portrait_fund"]   # 配置需要插入資料的表名
    # 配置各表關聯欄位的關係,值相同的鍵為關聯欄位 eg: linkfield_judge = {"fund_account":0,"client_id":0, "client_name":1}
    # 下述配置的意思為 配置的表中若有fund_account 與 client_id,各表中欄位fund_accout、client_id都做關聯;
    # 各表的client_name值都會關聯。
    linkfield_judge = {"fund_account":0,"client_name":0}
    # 例如:linkfield: {'pass_colunm':['portrait_allfund_account'],'client_name':['fund_account91', 'fund_account01'}
    linkfield = {"pass_colunm":[]}  # 儲存關聯欄位的值

    for table in tables:
        # print("linkfield:",linkfield)
        cols = generate_data.get_cols(table)    # 獲取表字段
        generate_data.generate_data_for_mysqldb(columns=cols,commit_num=10,commit_times=5)  # 生成資料

以上程式碼執行時需要配置的是:

tables = ["portrait_all","portrait_fund"]   # 配置需要插入資料的表名
linkfield_judge = {"fund_account":0,"client_name":0}
以及下述 commit_num 每次提交插入資料的sql值的個數;每張表中造的資料總是=commit_num*commit_times
generate_data.generate_data_for_mysqldb(columns=cols,commit_num=10,commit_times=5)  # 生成資料

有關 from connect_to_mysql import Connect_mysql 這個方法以及相關配置可查閱此文章:https://blog.csdn.net/ck3207/article/details/80233204