將mysql資料庫的建表語句修改成green plum資料庫中可執行的指令碼
阿新 • • 發佈:2018-11-02
#用來獲取輸入的對應的表名稱的建表語句
#首先輸入需要獲取的mysql或者sql server的資料庫表名稱,到對應的資料庫中抓出建表語句,
#然後,將建表語句進行對應的修改,形成可以在pg中可用的語句
#連線mysql資料 import pymysql import sys import re class MysqlDB: def __init__(self): self.hostname = '' self.port = 3306 self.username = '' self.password = '' self.database = '' def connectmysql(self): try: condb = pymysql.connect(host=self.hostname, port=self.port, user=self.username, passwd=self.password, db=self.database) return condb except Exception: info = sys.exc_info() print("連線異常", info[1]) def insertTable(self): # 獲取資料庫連線 condb = self.connectmysql() # 使用cursor() 方法建立一個遊標物件 cursor cursor = condb.cursor() try: # 執行sql語句 sql = "select * from cn_customer.bank" cursor.execute(sql) # 提交到資料庫執行 condb.commit() except Exception: # 方法一:捕獲所有異常 # 如果發生異常,則回滾 info = sys.exc_info() print("發生異常", info[1]) condb.rollback() finally: # 最終關閉資料庫連線 condb.close() def _convert_type(self, data_type): """Normalize MySQL `data_type`""" # if 'varchar' in data_type: # return 'varchar' if 'int' in data_type: return 'int4' # elif 'char' in data_type: # return 'char' elif data_type in ('bit(1)', 'tinyint(1)', 'tinyint(1) unsigned'): return 'boolean' elif re.search(r'smallint.* unsigned', data_type) or 'mediumint' in data_type: return 'integer' elif 'smallint' in data_type: return 'tinyint' elif 'tinyint' in data_type or 'year(' in data_type: return 'tinyint' elif 'bigint' in data_type and 'unsigned' in data_type: return 'numeric' elif re.search(r'int.* unsigned', data_type) or \ ('bigint' in data_type and 'unsigned' not in data_type): return 'bigint' elif 'float' in data_type: return 'float' elif 'decimal' in data_type: return 'decimal' elif 'double' in data_type: return 'double precision' else: return data_type #獲取的table_columns名稱,長度等資訊 def load_columns(self,table_name): # 獲取資料庫連線 condb = self.connectmysql() # 使用cursor() 方法建立一個遊標物件 cursor cursor = condb.cursor() fields = [] cursor.execute('EXPLAIN `%s`' % table_name) table_info = cursor.fetchall() for res in table_info: if res[2] == 'YES': table_null = 'not null' else: table_null = '' desc = { 'column_name': res[0].lower(), 'table_name': table_name.lower(), 'type': self._convert_type(res[1]).lower(), # 'length': int(length) if length else None, # 'decimals': precision_match.group(2) if precision_match else None, 'null': table_null , 'primary_key': res[3] == 'PRI', # 'auto_increment': res[5] == 'auto_increment', # 'default': res[4] if not res[4] == 'NULL' else None, } fields.append(desc) #print(fields) self.postgres_create(fields) #在postgresql中建立表 def postgres_create(self,fields): table_name = 'dw_stg.stg_cus_dim_'+ fields[0]['table_name'] columns = [] primary_key=[] for field in fields: if field['primary_key']: primary_key.append(field['column_name']) table_column = field['column_name'] + ' ' + field['type'] + ' ' + field['null'] + ','+ '\n' columns.append(table_column) create_columns = ''.join(columns) + 'ord_loc varchar(10),\nmodify_date_etl timestamp default now(),\nload_dt timestamp default now(),\n' create_primary_key = ','.join(primary_key) create_sql = "create table %s (\n%sprimary key(%s)\n)\ndistributed by (%s)" %(table_name,create_columns,create_primary_key,create_primary_key) print(create_sql) def main(): mysqldb = MysqlDB() connect_result = mysqldb.load_columns('bank') main()