用Python3.6操作HBase之HBase-Thrift
阿新 • • 發佈:2018-12-25
全棧工程師開發手冊 (作者:欒鵬)
本機Linux下安裝Thrift
執行如下命令安裝Thrift依賴:
apt-get install automake bison flex g++ git libboost1.55 libevent-dev libssl-dev libtool make pkg-config
解壓編譯:
tar -zxvf thrift-0.11.0.tar.gz cd thrift-0.11.0 ./configure --with-cpp --with-boost --with-python --without-csharp --with-java --without-erlang --without-perl --with-php --without-php_extension --without-ruby --without-haskell --without-go make make install
在叢集Master中Hbase安裝目錄下的/usr/local/hbase/bin目錄啟動thrift服務:
./hbase-daemon.sh start thrift
python操作hbase
安裝依賴包
pip install thrift
pip install hbase-thrift
先啟動hbase
cd /usr/local/hbase
bin/start-hbase.sh
簡單demo
from thrift.transport import TSocket,TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase # thrift預設埠是9090 socket = TSocket.TSocket('127.0.0.1',9090) socket.setTimeout(5000) transport = TTransport.TBufferedTransport(socket) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) socket.open() print(client.getTableNames()) # 獲取當前所有的表名
python3操作hbase會報錯.
首先要下載python3的Hbase檔案,替換Hbase檔案/usr/local/lib/python3.6/dist-packages/hbase/Hbase.py和ttypes.py
下載地址為:https://github.com/626626cdllp/infrastructure/tree/master/hbase
常用方法說明
- createTable(tbaleName,columnFamilies):建立表,無返回值
- tableName:表名
- columnFamilies:列族資訊,為一個ColumnDescriptor列表
from hbase.ttypes import ColumnDescriptor
# 定義列族
column = ColumnDescriptor(name='cf')
# 建立表
client.createTable('test4',[column])
- 1
- 2
- 3
- 4
- 5
- 6
- enableTable(tbaleName):啟用表,無返回值
- tableName:表名
# 啟用表,若表之前未被禁用將會引發IOError錯誤
client.enableTable('test')
- 1
- 2
- disableTable(tbaleName):禁用表,無返回值
- tableName:表名
# 禁用表,若表之前未被啟用將會引發IOError錯誤
client.disableTable('test')
- 1
- 2
- isTableEnabled(tbaleName):驗證表是否被啟用,返回一個bool值
- tableName:表名
client.isTableEnabled('test')
- 1
- getTableNames(tbaleName):獲取表名列表,返回一個str列表
- tableName:表名
client.getTableNames()
- 1
- getColumnDescriptors(tbaleName):獲取所有列族資訊,返回一個字典
- tableName:表名
client.getColumnDescriptors('test')
- 1
- getTableRegions(tbaleName):獲取所有與表關聯的regions,返回一個
TRegionInfo
物件列表
- tableName:表名
client.getTableRegions('test')
- 1
- deleteTable(tbaleName):刪除表,無返回值
- tableName:表名
# 表不存在將會引發IOError(message='java.io.IOException: table does not exist...)錯誤
# 表未被禁用將會引發IOError(message='org.apache.hadoop.hbase.TableNotDisabledException:...)錯誤
client.deleteTable('test5')
- 1
- 2
- 3
- 4
- get(tableName,row,column):獲取資料列表,返回一個
hbase.ttypes.TCell
物件列表
- tableName:表名
- row:行
- column:列
result = client.get('test','row1','cf:a') # 為一個列表,其中只有一個hbase.ttypes.TCell物件的資料
print result[0].timestamp
print result[0].value
- 1
- 2
- 3
- getVer(tableName,row,column,numVersions):獲取資料列表,返回一個
hbase.ttypes.TCell
物件列表
- tableName:表名
- row:行
- column:列
- numVersions:要檢索的版本數量
result = client.get('test','row1','cf:a',2) # 為一個列表,其中只有一個hbase.ttypes.TCell物件的資料
print result[0].timestamp
print result[0].value
- 1
- 2
- 3
- getVerTs(tableName,row,column,timestamp,numVersions):獲取小於當前時間戳的資料列表(但是要注意預設可裝載的版本數目,因為預設只有一個版本數),返回一個
hbase.ttypes.TCell
物件列表
- tableName:表名
- row:行
- column:列
- timestamp:時間戳
- numVersions:要檢索的版本數量
result = client.get('test','row1','cf:a',2) # 為一個列表,其中只有一個hbase.ttypes.TCell物件的資料
print result[0].timestamp
print result[0].value
- 1
- 2
- 3
- getRow(tableName,row):獲取表中指定行在最新時間戳上的資料。返回一個
hbase.ttypes.TRowResult
物件列表,如果行號不存在返回一個空列表
- tableName:表名
- row:行
# 行
row = 'row1'
# 列
column = 'cf:a'
# 查詢結果
result = client.getRow('test',row) # result為一個列表
for item in result: # item為hbase.ttypes.TRowResult物件
print item.row
print item.columns.get('cf:a').value # 獲取值。item.columns.get('cf:a')為一個hbase.ttypes.TCell物件
print item.columns.get('cf:a').timestamp # 獲取時間戳。item.columns.get('cf:a')為一個hbase.ttypes.TCell物件
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- getRowWithColumns(tableName,row,columns):獲取表中指定行與指定列在最新時間戳上的資料。返回一個
hbase.ttypes.TRowResult
物件列表,如果行號不存在返回一個空列表
- tableName:表名
- row:行
- columns:列,list
result = client.getRowWithColumns('test','row1',['cf:a','df:a'])
for item in result:
print item.row
print item.columns.get('cf:a').value
print item.columns.get('cf:a').timestamp
print item.columns.get('df:a').value
print item.columns.get('df:a').timestamp
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- getRowTs(tableName,row,timestamp):獲取表中指定行並且小於指定時間戳的最新的一條資料。返回一個
hbase.ttypes.TRowResult
物件列表,如果行號不存在返回一個空列表
- tableName:表名
- row:行
- timestamp:時間戳
result = client.getRowTs('test','row1',1513069831512)
- 1
- getRowWithColumnsTs(tableName,row,columns,timestamp):獲取指定行與指定列,並且小於這個時間戳的最近一次資料。返回一個
hbase.ttypes.TRowResult
物件列表,如果行號不存在返回一個空列表
- tableName:表名
- row:行
- columns:列,list
- timestamp:時間戳
result = client.getRowWithColumnsTs('test','row1',['cf:a','cf:b','df:a'],1513069831512)
- 1
- mutateRow(tableName,row,mutations):在表中指定行執行一系列的變化操作。如果丟擲異常,則事務被中止。使用預設的當前時間戳,所有條目將具有相同的時間戳。無返回值
- tableName:表名
- row:行
- mutations:變化,list
from hbase.ttypes import Mutation
mutation = Mutation(column='cf:a',value='1')
# 插入資料。如果在test表中row行cf:a列存在,將覆蓋
client.mutateRow('test','row1',[mutation])
- 1
- 2
- 3
- 4
- 5
- 6
- mutateRowTs(tableName,row,mutations,timestamp):在表中指定行執行一系列的變化操作。如果丟擲異常,則事務被中止。使用指定的時間戳,所有條目將具有相同的時間戳。如果是更新操作時,如果指定時間戳小於原來資料的時間戳,將被忽略。無返回值
- tableName:表名
- row:行
- mutations:變化,list
- timestamp:時間戳
from hbase.ttypes import Mutation
# value必須為字串格式,否則將報錯
mutation = Mutation(column='cf:a',value='2')
client.mutateRowTs('test','row1',[mutation],1513070735669)
- 1
- 2
- 3
- 4
- mutateRows(tableName,rowBatches):在表中執行一系列批次(單個行上的一系列突變)。如果丟擲異常,則事務被中止。使用預設的當前時間戳,所有條目將具有相同的時間戳。無返回值
- tableName:表名
- rowBatches:一系列批次
from hbase.ttypes import Mutation,BatchMutation
mutation = Mutation(column='cf:a',value='2')
batchMutation = BatchMutation('row1',[mutation])
client.mutateRows('test',[batchMutation])
- 1
- 2
- 3
- 4
- mutateRowsTs(tableName,rowBatches,timestamp):在表中執行一系列批次(單個行上的一系列突變)。如果丟擲異常,則事務被中止。使用指定的時間戳,所有條目將具有相同的時間戳。如果是更新操作時,如果指定時間戳小於原來資料的時間戳,將被忽略。無返回值
- tableName:表名
- rowBatches:一系列批次,list
- timestamp:時間戳
mutation = Mutation(column='cf:a',value='2')
batchMutation = BatchMutation('row1',[mutation])
client.mutateRowsTs('cx',[batchMutation],timestamp=1513135651874)
- 1
- 2
- 3
- atomicIncrement(tableName,row,column,value):原子遞增的列。返回當前列的值
- tableName:表名
- row:行
- column:列
- value:原子遞增的值
result = client.atomicIncrement('cx','row1','cf:b',1)
print result # 如果之前的值為2,此時值為3
- 1
- 2
- deleteAll(tableName,row,column):刪除指定表指定行與指定列的所有資料,無返回值
- tableName:表名
- row:行
- column:列
client.deleteAll('cx','row1','cf:a')
- 1
- deleteAllTs(tableName,row,column,timestamp):刪除指定表指定行與指定列中,小於等於指定時間戳的所有資料,無返回值
- tableName:表名
- row:行
- column:列
- timestamp:時間戳
client.deleteAllTs('cx','row1','cf:a',timestamp=1513569725685)
- 1
- deleteAllRow(tableName,row):刪除整行資料,無返回值
- tableName:表名
- row:行
client.deleteAllRow('cx','row1')
- 1
- deleteAllRowTs(tableName,row,timestamp):刪除指定表指定行中,小於等於此時間戳的所有資料,無返回值
- tableName:表名
- row:行
- timestamp:時間戳
client.deleteAllRowTs('cx','row1',timestamp=1513568619326)
- 1
- scannerOpen(tableName,startRow,columns):在指定表中,從指定行開始掃描,到表中最後一行結束,掃描指定列的資料。每行只去最新的一次資料,返回一個ScannerID,int型別
- tableName:表名
- startRow:起始行
- columns:列名列表,list型別
scannerId = client.scannerOpen('cx','row2',["cf:b","cf:c"])
- 1
- scannerOpenTs(tableName,startRow,columns,timestamp):在指定表中,從指定行開始掃描,每行獲取所有小於指定時間戳的最新的一條資料,掃描指定列的資料。返回一個ScannerID,int型別
- tableName:表名
- startRow:起始行
- columns:列名列表,list型別
- timestamp:時間戳
scannerId = client.scannerOpenTs('cx','row1',["cf:a","cf:b","cf:c"],timestamp=1513579065365)
- 1
- scannerOpenWithStop(tableName,startRow,stopRow,columns):在指定表中,從指定行開始掃描,掃描到結束行結束(並不獲取指定行的資料),掃描指定列的資料。返回一個ScannerID,int型別
- tableName:表名
- startRow:起始行
- stopRow:結束行
- columns:列名列表,list型別
scannerId = client.scannerOpenWithStop('cx','row1','row2',["cf:b","cf:c"])
- 1
- scannerOpenWithStopTs(tableName,startRow,stopRow,columns,timestamp):在指定表中,從指定行開始掃描,掃描到結束行結束(並不獲取指定行的資料),每行獲取所有小於指定時間戳的最新的一條資料,掃描指定列的資料。返回一個ScannerID,int型別
- tableName:表名
- startRow:起始行
- stopRow:結束行
- columns:列名列表,list型別
- timestamp:時間戳
scannerId = client.scannerOpenWithStopTs('cx','row1','row2',["cf:a","cf:b","cf:c"],timestamp=1513579065365)
- 1
- scannerOpenWithPrefix(tableName,startAndPrefix,columns):在指定表中,掃描具有指定字首的行,掃描指定列的資料。每行獲取最新的一條資料,返回一個ScannerID,int型別
- tableName:表名
- startAndPrefix:行字首
- columns:列名列表,list型別
scannerId = client.scannerOpenWithPrefix('cx','row',["cf:b","cf:c"])
- 1
- scannerGet(id):根據ScannerID來獲取結果,返回一個
hbase.ttypes.TRowResult
物件列表
- id:ScannerID
scannerId = client.scannerOpen('cx','row1',["cf:b","cf:c"])
while True:
result = client.scannerGet(scannerId)
if not result:
break
print result
- 1
- 2
- 3
- 4
- 5
- 6
- scannerGetList(id,nbRows):根據ScannerID來獲取指定數量的結果,返回一個
hbase.ttypes.TRowResult
物件列表
- id:ScannerID
- nbRows:指定行數
scannerId = client.scannerOpen('cx','row1',["cf:b","cf:c"])
result = client.scannerGetList(scannerId,2)
- 1
- 2
- scannerClose(id):關閉掃描器,無返回值
- id:ScannerID
client.scannerClose(scannerId)
- 1
<link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/markdown_views-ea0013b516.css">
</div>
整套demo
# 使用前需要啟動hbase和thrift伺服器
from thrift.transport import TSocket,TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase
# thrift預設埠是9090
socket = TSocket.TSocket('127.0.0.1',9090)
socket.setTimeout(5000)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = Hbase.Client(protocol)
socket.open()
# print(client.get('table1','row1','cf:a'))
from hbase.ttypes import ColumnDescriptor
alltable = client.getTableNames() # 獲取所有表名
print('所有表格',alltable)
if('test' in alltable):
allcf = client.getColumnDescriptors('test') # 獲取表的所有列族
print('test表的列族',allcf)
allregions = client.getTableRegions('test') # 獲取所有與表關聯的regions
print('test表的所有regions',allregions)
else:
column1 = ColumnDescriptor(name='cf1') # 定義列族
column3 = ColumnDescriptor(name='cf2') # 定義列族
client.createTable('test', [column1,column3]) # 建立表
print('建立表test')
# 驗證表是否被啟用
if(not client.isTableEnabled('test')):
client.enableTable('test') # 啟用表
print('啟用表test')
# =======插入/修改資料=======
from hbase.ttypes import Mutation
mutation = Mutation(column='cf1:a', value='1')
# 插入資料。如果在test表中row行cf1:a列存在,將覆蓋
client.mutateRow('test', 'row1', [mutation]) # 在表中指定行執行一系列的變化操作。
client.mutateRowTs('test','row2',[mutation],1513070735669) # 可以自己新增時間戳
print('插入資料')
from hbase.ttypes import Mutation,BatchMutation
mutation1 = Mutation(column='cf1:b',value='2')
mutation2 = Mutation(column='cf2:a',value='3')
mutation3 = Mutation(column='cf2:b',value='4')
batchMutation = BatchMutation('row3',[mutation])
client.mutateRows('test',[batchMutation]) # 在表中執行一系列批次(單個行上的一系列突變)
client.mutateRowsTs('test',[batchMutation],timestamp=1513135651874) # 也可以自己新增時間戳
print('插入資料')
result = client.atomicIncrement('test','row1','cf1:c',1) # 原子遞增的列進行一次遞增。返回當前列的值
print(result)
# ============獲取資料=========
result = client.get('test', 'row1', 'cf1:a') # 為一個列表,其中只有一個hbase.ttypes.TCell物件的資料
result = client.getVer('test', 'row1', 'cf1:a', numVersions = 2) # 為一個列表,其中只有一個hbase.ttypes.TCell物件的資料
result = client.getVerTs('test', 'row1', 'cf1:a', timestamp=0,numVersions = 2) # 為一個列表,其中只有一個hbase.ttypes.TCell物件的資料
print(result)
# 行
row = 'row1'
# 列
column = 'cf1:a'
# 查詢結果
result = client.getRow('test',row) # result為一個列表,獲取表中指定行在最新時間戳上的資料
for item in result: # item為hbase.ttypes.TRowResult物件
print('行索引:',item.row)
print('列值:',item.columns.get(column).value) # 獲取值。item.columns.get('cf:a')為一個hbase.ttypes.TCell物件
print('時間戳:',item.columns.get(column).timestamp) # 獲取時間戳。item.columns.get('cf:a')為一個hbase.ttypes.TCell物件
# 獲取指定行指定列上的資料
result = client.getRowWithColumns('test','row1',['cf1:a','cf2:a']) #獲取表中指定行與指定列在最新時間戳上的資料
for item in result:
print('行索引:',item.row)
cf1_a = item.columns.get('cf1:a')
if (cf1_a != None):
print('cf1:a列值:',cf1_a.value)
print('時間戳:',cf1_a.timestamp)
cf2_a = item.columns.get('cf2:a')
if(cf2_a!=None):
print('cf2:a列值:',cf2_a.value)
print('時間戳:',cf2_a.timestamp)
result = client.getRowTs('test','row1',1513069831512) # 獲取表中指定行並且小於這個時間戳的所有資料
print(result)
result = client.getRowWithColumnsTs('test','row1',['cf1:a','cf1:b','cf2:a'],1513069831512) # 獲取指定行與指定列,並且小於這個時間戳的所有資料
print(result)
# ==============掃描資料==================
scannerId = client.scannerOpen('test','row1',["cf1:b","cf2:a"]) # 在指定表中,從指定行開始掃描,到表中最後一行結束,掃描指定列的資料。
scannerId = client.scannerOpenTs('test','row1',["cf1:b","cf2:a"],timestamp=1513579065365) # 在指定表中,從指定行開始掃描,獲取所有小於指定時間戳的所有資料,掃描指定列的資料
scannerId = client.scannerOpenWithStop('test','row1','row2',["cf1:b","cf2:a"]) # 在指定表中,從指定行開始掃描,掃描到結束行結束(並不獲取指定行的資料),掃描指定列的資料
scannerId = client.scannerOpenWithStopTs('test','row1','row2',["cf1:b","cf2:a"],timestamp=1513579065365) # 獲取所有小於指定時間戳的所有資料
scannerId = client.scannerOpenWithPrefix('test','row',["cf1:b","cf2:a"]) #在指定表中,掃描具有指定字首的行,掃描指定列的資料
while True:
result = client.scannerGet(scannerId) # 根據ScannerID來獲取結果
if not result:
break
print(result)
result = client.scannerGetList(scannerId,2) # 根據ScannerID來獲取指定數量的結果
client.scannerClose(scannerId) # 關閉掃描器
# ===============刪除資料==============
client.deleteAll('test','row1','cf1:a') # 刪除指定表指定行與指定列的所有資料
client.deleteAllTs('test','row1','cf2:a',timestamp=1513569725685) # 刪除指定表指定行與指定列中,小於等於指定時間戳的所有資料
client.deleteAllRowTs('test','row1',timestamp=1513568619326) # 刪除指定表指定行中,小於等於此時間戳的所有資料
client.deleteAllRow('test','row1') # 刪除整行資料
if(not client.isTableEnabled('test')):
client.disableTable('test')
print('禁用表test')
client.deleteTable('test') # 刪除表.必須確保表存在,且被禁用
print('刪除表test')