python操作hbase(基於thrift服務)
阿新 • • 發佈:2019-01-05
一:環境說明
ubuntu 16.04
python2.7
happybase (python包, 執行命令 pip install happybase 安裝)
hbase v1.3.1 (docker 映象)
thrift
特別注意
thrift 、thrift2,新版本的hbase,預設使用thrift2,而thrift2相比thrift,去掉了很多對hbase的命令支援。如果你要換用thrift,只要停止thrift2 服務,啟動thrift服務即可
啟動、停止命令:
/hbase/bin/hbase-daemon.sh stop thrift2
/hbase/bin/hbase-daemon.sh start thrift
二:程式碼實現
# -*- coding:utf-8 -*-
import happybase
from collections import Iterable
import sys, os
HBASE_HOST = '127.0.0.1'
HBASE_PORT = 9090
class HBase(object):
def __init__(self, _tableName, _families=None):
self.connection = happybase.Connection(HBASE_HOST, HBASE_PORT, autoconnect=False )
self.connection.open()
self.tableName = _tableName
self.create_table()
def create_table(self):
"""
如果此表不存在,則建立
:param _tableName: 表名
:return:
"""
_tableList = self.connection.tables()
families = {'%s' % self.tableName: dict()}
if self.tableName not in _tableList:
self.connection.create_table(self.tableName, families)
def put(self, _rowKey, _fields):
"""
insert/update
:param _rowKey: string
:param _fields: {'domain':'www.baidu.com', 'name':'百度'}
:return:
"""
print _rowKey, _fields
fields = {}
for k, v in _fields.items():
fields['%s:%s' % (self.tableName, k)] = v
table = self.connection.table(self.tableName)
print _rowKey,fields
table.put(_rowKey, fields)
def scan(self, _rowKey=None, _filters=None):
"""
多列、and、精確/模糊匹配 查詢
:param _rowKey: string ^138$
:param _filters: dict {'name':'^百度$', 'domain':'^www.baidu.com$'}
:return:
"""
table = self.connection.table(self.tableName)
filters = None
if _rowKey:
filters = "RowFilter(=,'regexstring:%s')" % _rowKey
if _filters:
for k, v in _filters.items():
if not filters:
filters = "SingleColumnValueFilter('%s','%s',=,'regexstring:^%s$') " % (self.tableName, k, v)
else:
fl = " AND SingleColumnValueFilter('%s','%s',=,'regexstring:^%s$')" % (self.tableName, k, v)
filters += fl
rows = table.scan(filter=filters)
return self.dumps(rows)
def count(self, _field):
"""
count統計行數
:param _field: 根據某列統計
:return: int
"""
field = '%s:%s' % (self.tableName, _field)
table = self.connection.table(self.tableName)
count = 0
for key, data in table.scan(columns=[field]):
count += 1
return count
def row(self, _rowKey, _columns=None):
"""
根據rowKey,獲取某行資料
:param _rowKey: string '123'
:param _columns: list ['name', 'domain']
:return: dict
"""
columns = []
if _columns:
columns = ['%s:%s' % (self.tableName, i) for i in _columns]
table = self.connection.table(self.tableName)
row = table.row(_rowKey, columns=columns)
rtn = self.dumps(row)
rtn['_id'] = _rowKey
return rtn
def rows(self, _rowKey):
"""
根據rowKeys,獲取某幾行資料
:param _rowKey: list ['123', '234']
:return: list
"""
table = self.connection.table(self.tableName)
rows = table.rows(_rowKey)
return self.dumps(rows)
def dumps(self, _data):
"""
將資料轉換為dict,或list
:param _data: list or dict or generator
:return: dict or list
"""
rtn = []
if isinstance(_data, dict):
rtn = {}
for k, v in _data.items():
rtn[k.split(':')[1]] = v
elif isinstance(_data, list):
for row in _data:
tmp = {}
tmp['_id'] = row[0]
for k, v in row[1].items():
tmp[k.split(':')[1]] = v
if tmp:
rtn.append(tmp)
elif isinstance(_data, Iterable):
for key, data in _data:
tmp = {}
tmp['_id'] = key
for k, v in data.items():
tmp[k.split(':')[1]] = v
if tmp:
rtn.append(tmp)
else:
pass
return rtn
def __del__(self):
self.connection.close()
if __name__ =='__main__':
print 'start----------------'
hbase = HBase('test1')
hbase.put('111', {'domain':'www.baidu.com', 'name':'百度'})
print 'insert data finshed'
print hbase.scan()
print 'scan data finshed'
print hbase.scan(_rowKey='1')
print 'scan data finshed'
print hbase.scan(_filters={'name':'^百度$', 'domain':'^www.baidu.com$'})
print 'scan data finshed'
print hbase.scan(_rowKey='^1$', _filters={'name': '^百度$', 'domain': '^www.baidu.com$'})
print 'scan data finshed'
print hbase.row('111')
print 'row data finshed'
print hbase.row('111', ['name'])
print 'row data finshed'
print hbase.rows(['111'])
print 'rows data finshed'
print hbase.count('name')
print 'count data finshed'
print 'end-----------------------'