1. 程式人生 > >利用pyhive將hive查詢資料匯入到mysql

利用pyhive將hive查詢資料匯入到mysql

在大資料工作中經常碰到需要將hive查詢資料匯入到mysql的需求,常見的方法主要有兩種,一是sqoop,另一種則是pyhive。本文主要講的就是python的pyhive庫的安裝與使用。

pyhive作用

  • 遠端連線hive資料庫,執行hive sql,而不需要登入到安裝有hive的伺服器上去
  • 可以更方便處理更多連續命令,可以封裝一些經常需要複用的命令
  • 指令碼化,不需要編譯,隨時改,隨時執行看結果
  • 方便對hive sql的結果進行更多處理,而不是僅僅在終端打印出來看一看

安裝環境

這裡只講解Linux環境,以ubuntu為例,其他系統類似

  • python 環境,我裝的是python2 apt-get install python2.7
  • apt-get install python-dev
  • apt-get install libsasl2-dev
  • pip install sasl
  • pip install thrift-sasl
  • pip install PyHive

    注意不要漏裝,否則報錯,另外如果使用python3,則安裝包的名字可能不一樣,比如python3-dev

用法

連線hive資料庫

conn=hive.Connection(host='xxx.xxx.xxx.xxx',   port=10000, database='database name', auth='NOSASL' )
cursor = conn.cursor()
cursor.execute(sql)
res = cursor.fetchall()  或者  res = cursor.fetchone()
cursor.close()
conn.close()

host是hive所在伺服器ip,埠一般是10000,但具體是多少需要看配置來確定,配置所在位置為 hive目錄/conf/hive-site.xml 其中的配置項 hive.server2.thrift.port 即為埠號。 sql就是 hive sql語句。

例項應用

場景

查詢hive資料庫,並將結果寫入到mysql中

分析

  • 直接用hive sql肯定是不行的,因為結果要匯入到mysql中,而不是列印看看或者儲存到文字
  • sqoop可以,但sqoop的主要功能是把hive 的資料匯入到mysql,而不能執行hive查詢命令。換句話說,需要先另建一個hive表,先用hivesql查詢,把結果儲存在這個表中,然後用sqoop導到mysql中。
  • pyhive則可以執行hivesql,而且基於python,所以腳本里肯定也可以操作mysql資料庫,即可以寫入mysql
  • 如果查詢得到的資料量比較大,那麼建議使用sqoop,哪怕步驟可能會麻煩一點,但總體上是節約時間的,因為它從hive到mysql的匯入是封裝好的,應該比我們自己寫insert sql要快。
  • 但是pyhive更方便,更靈活,更容易應對撈資料的需求。

python 原始碼如下:

#!/usr/bin/env python

import sys
from pyhive import hive
import MySQLdb

class HiveClient:

    def __init__(self, db_host, hdatabase, hport=10000):
        """
        create connection to hive server
        """
        self.conn = hive.Connection(host=db_host,   port=hport,  database=hdatabase)

    def query(self, sql):
        """
        query
        """
        cursor = self.conn.cursor()
        cursor.execute(sql)
        res = cursor.fetchall()
        cursor.close()
        return res

    def close(self):
        """
        close connection
        """
        self.conn.close()


class MysqlClient:
    def __init__(self, mdb):
        self.conn = MySQLdb.connect (host = "rm-xxx.mysql.rds.aliyuncs.com",   user = "xxx",  passwd = "xxx",  db = mdb)
    def insert(self,sql):
        cursor = self.conn.cursor()
        cursor.execute(sql)
        cursor.close ()
        self.conn.commit()

    def close(self):
        self.conn.close ()

def dhive():
    try:
        hcon = HiveClient('127.0.0.1', 'db_name', '20000' )
        mcon = MysqlClient('a_db_name')
        res = hcon.query("select name, accountid from a_table_name where length(name)>10 ")
        hcon.close()
        for item in res:
            sql = """ insert ignore into b_table_name (name, accid) value( '%s', %s) """%( item[0], item[1])
            mcon.insert(sql)
        mcon.close()

    except Exception, tx:
        print 'excepion %s' % (tx.message)

if __name__ == "__main__":
    dhive()

可以看出,將hive連線封裝後,呼叫會變得非常方便,以後複用也很方便。