基於teradata的python模組實現簡易的客戶端
阿新 • • 發佈:2018-12-27
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Date : 2018-09-07 20:48:11 # @Author : Sheldon ([email protected]) # @Version : 0.0.1 import teradata import pandas as pd class GenHasnoLen(Exception): pass class TD: def __init__(self): _udaExec = teradata.UdaExec() self.session= _udaExec.connect("${dataSourceName}") self._sql = None self._len = None def __call__(self, sql=None, index_col=None, coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None, usepd=True, **kwargs): if usepd: result = pd.read_sql(sql, self.session, index_col, coerce_float, params, parse_dates, columns, chunksize) if chunksize: self._len = GenHasnoLen() else: self._len = len(result) else: result = self.session.execute(sql, params, **kwargs) self._len = result.rowcount self._sql= sql return result def __len__(self): return self._len def __str__(self): info = f"sql語句是;\n{self._sql}\n" \ f"一共有{self._len}條記錄" return info def _sel_1_tbs_db(self, tablename, *addcol): scdbsql = f"SELECT DatabaseName,TableKind,CommentString,CreatorName\n" \ f"FROM DBC.Tables\n" \ f"WHERE TableName = '{tablename}'" temp_dbname = self(scdbsql, usepd=False) rc = temp_dbname.rowcount if rc > 1: rows = [] print("此表存在於多個數據庫中,請從下列庫中選擇") for row in temp_dbname: rows.append(row) print(row) while True: indexstr = input("請輸入資料庫所在行號,按Enter確定," \ "輸入非數字退出:\n") if not indexstr.isnumeric(): return index = int(indexstr.strip()) if 0 < index <= row.rowNum: selrow = rows[index - 1] break print("輸入錯誤的行號範圍,請重新輸入") elif rc == 1: selrow = next(temp_dbname) else: return return map(lambda col: selrow[col].strip(), ("DatabaseName", *addcol)) def dblist(self, access=True, dbkind="D"): # dbkind資料庫型別,D為DATABASE,U為Users,None為所有 acc_clause = '' dbk_clause = '' clause = '' if access: config = self.session.udaexec.config externalDSN = config.resolve("${dataSourceName}") args = config.section(externalDSN) username = args.get("username") if username: acc_clause = f"AccountName = (\n" \ f"\tSELECT DefaultAccount\n" \ f"\tFROM DBC.Users\n" \ f"{acc_clause}" \ f"\tWHERE UserName='{username}'\n" \ f")" if dbkind: assert dbkind in ("D", "U") dbk_clause = f"DBKind = '{dbkind}'" join_clause = "\nAND ".join(x for x in (acc_clause, dbk_clause) if x) if join_clause: clause = "\nWHERE " + join_clause dblist_sql = f"SELECT DatabaseName,CommentString\n" \ f"FROM DBC.Databases{clause}" return self(dblist_sql) def tbsdb(self, tbname, usepd=True): trans_sql = f"SELECT DatabaseName\n" \ f"FROM DBC.Databases2 DB2\n" \ f"INNER JOIN DBC.Tables2 TB2\n" \ f"ON DB2.DatabaseId = TB2.DatabaseId\n" \ f"WHERE TVMName = '{tbname}'" return self(trans_sql, usepd=usepd) def definition(self, tablename: str, tbkind: str = "T"): kind2type = {"T": "TABLE", "O": "TABLE", "V": "VIEW", "M": "MACRO", "P": "PROCEDURE", "E": "PROCEDURE", "R": "FUNCTION", "F": "FUNCTION", "G": "TRIGGER", "I": "JOIN INDEX", "N": "HASH INDEX"} if '.' in tablename: dtname = tablename else: dbname, tbkind = self._sel_1_tbs_db(tablename, "TableKind") if not dbname: return dtname = f"{dbname}.{tablename}" def_cursor = self(f"SHOW {kind2type[tbkind]} {dtname}", usepd=False) row_lst = next(def_cursor) if row_lst: return row_lst[0].replace("\r", "\n") def tablelist(self, dbname: str = None): clause = '' if dbname is None else f"\nWHERE TableName = '{dbname}'" tbsql = f"SELECT DISTINCT DatabaseName,TableName,TableKind,CommentString,CreatorName\n" \ f"FROM DBC.Tables" \ f"{clause}" return self(tbsql) def columnlist(self, tablename: str): colstr = "ColumnName,ColumnTitle,ColumnType,ColumnLength,DecimalTotalDigits,DecimalFractionalDigits,ColumnFormat,DefaultValue,Nullable,CommentString" base = f"SELECT {colstr}\n" \ f"FROM DBC.Columns\n" if "." in tablename: db, table = tablename.split(".") suffix = f"WHERE DatabaseName='{db}' AND TableName='{table}'" else: colstr = "DatabaseName" + colstr suffix = f"WHERE TableName='{tablename}'\n ORDER BY DatabaseName" sql = base + suffix print(sql) return self(sql) def indexs(self, dtname): dbname, tablename = dtname.split(".") indexsql = f"SELECT IndexName,ColumnName,IndexType,UniqueFlag,IndexNumber,ColumnPosition\n" \ f"FROM DBC.Indices\n" \ f"WHERE DatabaseName='{dbname}' AND TableName='{tablename}'" return self(indexsql) def search(self, sth, dbname: str = None, user: str = None): if dbname: dbclause = f" AND DatabaseName='{dbname}'" else: dbclause = '' if user: userclause = f" AND CreatorName='{user}'" else: userclause = '' scsql = f"SELECT TRIM(DatabaseName) || '.' || TRIM(TableName) AS TbPath,TableKind,CommentString\n" \ f"FROM DBC.Tables\n" \ f"WHERE (TableName LIKE '%{sth}%' OR CommentString LIKE '%{sth}%'){dbclause+userclause}\n" \ f"ORDER BY CASE 2*LEAST(1,INDEX(TableName,'{sth}'))+LEAST(1,INDEX(CommentString,'{sth}'))\n" \ f"\t\tWHEN 3 THEN 1.0*LENGTH('{sth}')/LENGTH(TRIM(TableName))+1.0*LENGTH('{sth}')/LENGTH(TRIM(CommentString))\n" \ f"\t\tWHEN 2 THEN 1.0*LENGTH('{sth}')/LENGTH(TRIM(TableName))\n" \ f"\t\tELSE 1.0*LENGTH('{sth}')/LENGTH(TRIM(CommentString))\n" \ f"\t\tEND DESC\n" return self(scsql) if __name__ == "__main__": app = TD() print(app.indexs("FRT_DATA.POS_DTL")) app('SELECT TOP 10 * FROM DBC.Tables') print(app.definition("FRT_DATA.POS_DTL")) print(app.definition("TRD_INFO")) print(app.dblist()) print(app.search('外賣')) print(app)
使用方法:
- 安裝依賴庫
- 安裝ODBC驅動
- 在python指令碼同級目錄,新建配置檔案:udaexec.ini,自行修改相應的配置項,配置內容如下:
# Application Configuration [CONFIG] appName=TDClient version=2 logConsole=False dataSourceName=TDPROD table=DBC.DBCInfo # Default Data Source Configuration [DEFAULT] method=odbc charset=ASCII # Data Source Definition [TDPROD] system=10.213.5.6 username=Sheldon password=pw12345