Python連線Hive/Impala
阿新 • • 發佈:2021-02-03
技術標籤:Python
目前Pyhive和impyla不太相容,需要選擇合適的版本
PyHive==0.6.2
SQLAlchemy==1.3.18
impyla==0.15a1
pandas==1.0.5
thrift==0.13.0
thrift-sasl==0.4.2
thriftpy2==0.4.0
一、impala連線
def get_current_engine(self): host = self._hive_write_conn_conf.get("host") port = self._hive_write_conn_conf.get("port") user = self._hive_write_conn_conf.get("user") pwd = self._hive_write_conn_conf.get("pwd") db_name = self._hive_write_conn_conf.get("db_name") from impala.dbapi import connect return connect(host=host, port=int(port), user=user, password=pwd, database=db_name, timeout=3600, auth_mechanism='PLAIN') def execute_sql(self, sql: str): engine_len = len(self._candidate_engine_list) for x in range(engine_len): self._curr_cursor = self.get_current_engine().cursor() try: self._curr_cursor.execute(sql) self.get_current_engine().commit() except Exception as e: self.get_current_engine().rollback() self.alert.send_stack_trace_msg("SQL execute Error: {} \ne: {}".format(sql, e)) finally: pass def query_as_pd(self, sql: str): engine_len = len(self._candidate_engine_list) final_result = None from impala.util import as_pandas for x in range(engine_len): cursor = self.get_current_engine().cursor() try: cursor.execute(sql) result = as_pandas(cursor) if result is not None: final_result = result break except Exception as e: self.get_current_engine().rollback() self.alert.send_stack_trace_msg("SQL execute Error: {}, e: {}".format(sql, e)) finally: cursor.close() self.get_current_engine().close() pass return final_result
二、pyhive連線
def get_current_engine(self): host = self._hive_read_conn_conf.get("host") port = self._hive_read_conn_conf.get("port") user = self._hive_read_conn_conf.get("user") pwd = self._hive_read_conn_conf.get("pwd") db_name = self._hive_read_conn_conf.get("db_name") from pyhive import hive return hive.Connection(host=host, port=port, username=user, password=pwd, database=db_name, auth='CUSTOM') def read_sql(self): return pandas.read_sql(self.get_current_engine())