Hbase Coprocessor(協處理器)的使用
本部落格記錄初次使用hbase coprocessor的過程。協處理器分兩種型別,系統協處理器可以全域性匯入region server上的所有資料表,表協處理器即是使用者可以指定一張表使用協處理器。協處理器框架為了更好支援其行為的靈活性,提供了兩個不同方面的外掛。一個是觀察者(observer),類似於關係資料庫的觸發器。另一個是終端(endpoint),動態的終端有點像儲存過程。
本次主要使用EndPoint完成計數和求和的功能。終端是動態RPC外掛的介面,它的實現程式碼被安裝在伺服器端,從而能夠通過HBase RPC喚醒。客戶端類庫提供了非常方便的方法來呼叫這些動態介面,它們可以在任意時候呼叫一個終端,它們的實現程式碼會被目標region遠端執行,結果會返回到終端。使用者可以結合使用這些強大的外掛介面,為HBase新增全新的特性。
準備工作
1、開發endpoint需要用到google protobuf,protobuf用於生成RPC框架程式碼,protpbuf版本需要和hbase對應,版本跨度太大可能導致未知問題,我開始就是踩了這個坑,具體版本可檢視hbase安裝目錄下的lib中protobuf-java-[version].jar。本次使用的hbase版本是1.2.6,對應protobuf是2.5.0,從網上下載protoc-2.5.0-win32.zip,解壓後可得到protoc.exe,將protoc.exe配置到環境變數中備用,protobuf的詳細使用方法可參考網上其他教程。
protobuf下載連結
2、建立一個maven工程,pom.xml新增如下依賴:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.7</jdk.version>
<hbase.version>1.2.5</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId >com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
</dependencies>
建立一個Endpoint的基本流程可以歸納為:
(1)建立一個通訊協議:準備一個proto檔案,然後使用protoc工具來生成協議類檔案。這個檔案需要在服務端及客戶端存 在。
(2)建立一個Service類,實現具體的業務邏輯
(3)建立表時指定使用這個EndPoint,或者是全域性配置。
(4)建立一個Client類,呼叫這個RPC方法。
(一)建立測試表
HBase表中有一個family命名為0, 一個column命名為c,rowkey為某個id,使用hbase shell建立表並新增測試資料。
create 'test','0'
put 'test','id1','0:c',100
put 'test','id2','0:c',200
put 'test','id3','0:c',300
put 'test','id4','0:c',400
put 'test','id5','0:c',500
接下來我們需要實現資料計數並計算“0:c”列的和。
(二)準備proto檔案
新建檔案,命名為count_sum.proto,新增如下內容:
syntax = "proto2";
option java_package = "com.hny.hbase.coprocessor";
option java_outer_classname = "CountAndSumProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message CountAndSumRequest {
required string family = 1;
required string column = 2;
}
message CountAndSumResponse {
required int64 count = 1 [default = 0];
required double sum = 2 [default = 0];
}
service RowCountAndSumService {
rpc getCountAndSum(CountAndSumRequest)
returns (CountAndSumResponse);
}
(二)使用protoc生成類檔案
windows下使用cmd進入上一步建立的proto檔案的目錄下,執行如下命令(由於已經將protoc.exe加入了環境變數,所以可以直接執行,如果提示protoc命令不存在可將protoc.exe複製到當前目錄下也可以)
protoc --java_out=./ count_sum.proto
命令執行完成後會在當前目錄下生產一個名稱為CountAndSumProtocol的類,將這個類複製到IDE中,這個類檔案有幾個地方需要注意:
1、生成了一個CountAndSumRequest 內部類,表示請求資訊
2、生成了一個CountAndSumResponse 內部類,表示返回資訊
3、生成了一個 RowCountAndSumService 內部類,表示所提供的服務,這個類還有一個內部介面,這個介面定義了 getCountAndSum()這個方法。
我們下面需要做的就是實現這個介面的這個方法,提供真正的服務。
(三)實現真實的服務
在CountAndSumProtocol同目錄下建立類CountAndSum,繼承CountAndSumProtocol,同時需要實現Coprocessor和CoprocessorService2個介面:
package com.hny.hbase.coprocessor;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class CountAndSum extends CountAndSumProtocol.RowCountAndSumService implements Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
@Override
public void getCountAndSum(RpcController controller, CountAndSumProtocol.CountAndSumRequest request, RpcCallback<CountAndSumProtocol.CountAndSumResponse> done) {
String family = request.getFamily();
if (null == family || "".equals(family)) {
throw new NullPointerException("you need specify the family");
}
String column = request.getColumn();
if (null == column || "".equals(column)) {
throw new NullPointerException("you need specify the column");
}
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
CountAndSumProtocol.CountAndSumResponse response = null;
InternalScanner scanner = null;
try {
// 計數
long count = 0;
// 求和
double sum = 0;
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
boolean hasMore;
// 切記不要用while(){}的方式,這種方式會丟失最後一條資料
do {
hasMore = scanner.next(results);
if (results.isEmpty()) {
continue;
}
Cell kv = results.get(0);
double value = 0;
try {
value = Double.parseDouble(Bytes.toString(CellUtil.cloneValue(kv)));
} catch (Exception e) {
}
count++;
sum += value;
results.clear();
} while (hasMore);
// 生成response
response = CountAndSumProtocol.CountAndSumResponse.newBuilder().setCount(count).setSum(sum).build();
} catch (IOException e) {
e.printStackTrace();
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
}
}
done.run(response);
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// do nothing
}
@Override
public Service getService() {
return this;
}
}
它需要實現以下4個方法,下面我們逐一討論一下:
getService():這個方法直接返回自身即可。
start(CoprocessorEnvironment env):這個方法會在coprocessor啟動時呼叫,這裡判斷了是否在一個region內被使用,而不是master,WAL等環境下被呼叫。
stop(CoprocessorEnvironment env):這個方法會在coprocessor完成時被呼叫,可用於關閉資源等,這裡為空。
getCountAndSum(…):這是整個類的核心方法,用於實現真正的業務邏輯。關鍵的步驟有:
(1)根據request建立一個Scanner,然後使用它建立一個 InternalScanner,可以更高效的進行scan
(2)對掃描出來的行進行分析處理,將結果儲存在幾個變數中。
(3)呼叫response的各個set()方法,設定返回的結果。
(4)使用 done.run(response); 返回結果到客戶端。
(四)部署coprocessor
將上述2個類進行打包,打包時不用包含protobuf和hbase相關的依賴。本示例暫時使用靜態部署的方式,將jar複製到每個regionserver節點的hbase/lib目錄下,然後修改hbase-site.xml,新增如下屬性:
<property>
<name>hbase.coprocessor.region.classes</name>
<value>com.hny.hbase.coprocessor.CountAndSum</value>
</property>
重啟hbase。
建議在hbase-site.xml中再加入以下配置,防止協處理器出現錯誤時導致regionServer掛掉。
<property>
<name>hbase.coprocessor.abortonerror</name>
<value>false</value>
</property>
(五)編寫呼叫端
客戶端的作用是將各個region的結果再次進行合併,客戶端需要依賴CountAndSumProtocol類,程式碼如下:
package com.hny.hbase.coprocessor.client;
import com.hny.hbase.coprocessor.CountAndSumProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Map;
public class CountAndSumClient {
public static class CountAndSumResult {
public long count;
public double sum;
}
private Connection connection;
public CountAndSumClient(Connection connection) {
this.connection = connection;
}
public CountAndSumResult call(String tableName, String family, String column, String
startRow, String endRow) throws Throwable {
Table table = connection.getTable(TableName.valueOf(Bytes.toBytes(tableName)));
final CountAndSumProtocol.CountAndSumRequest request = CountAndSumProtocol.CountAndSumRequest
.newBuilder()
.setFamily(family)
.setColumn(column)
.build();
byte[] startKey = (null != startRow) ? Bytes.toBytes(startRow) : null;
byte[] endKey = (null != endRow) ? Bytes.toBytes(endRow) : null;
// coprocessorService方法的第二、三個引數是定位region的,是不是範圍查詢,在startKey和endKey之間的region上的資料都會參與計算
Map<byte[], CountAndSumResult> map = table.coprocessorService(CountAndSumProtocol.RowCountAndSumService.class,
startKey, endKey, new Batch.Call<CountAndSumProtocol.RowCountAndSumService,
CountAndSumResult>() {
@Override
public CountAndSumResult call(CountAndSumProtocol.RowCountAndSumService service) throws IOException {
BlockingRpcCallback<CountAndSumProtocol.CountAndSumResponse> rpcCallback = new BlockingRpcCallback<>();
service.getCountAndSum(null, request, rpcCallback);
CountAndSumProtocol.CountAndSumResponse response = rpcCallback.get();
//直接返回response也行。
CountAndSumResult responseInfo = new CountAndSumResult();
responseInfo.count = response.getCount();
responseInfo.sum = response.getSum();
return responseInfo;
}
});
CountAndSumResult result = new CountAndSumResult();
for (CountAndSumResult ri : map.values()) {
result.count += ri.count;
result.sum += ri.sum;
}
return result;
}
}
測試程式碼:
public class Test{
public static void main(String[] args) throws Throwable {
// 使用該方式需要將hbase-site.xml複製到resources目錄下
Configuration conf = HBaseConfiguration.create();
// hbase-site.xml不在resources目錄下時使用如下方式指定
// conf.addResource(new Path("/home/hadoop/conf/hbase", "hbase-site.xml"));
Connection connection = ConnectionFactory.createConnection(conf);
String tableName = "test";
CountAndSumClient client = new CountAndSumClient(connection);
CountAndSumResult result = client.call(tableName, "0", "c", null, null);
System.out.println("count: " + result.count + ", sum: " + result.sum);
}
}
執行測試程式碼輸出如下:
count: 5, sum: 1500.0
注意:部署到叢集的jar包包括Service類和protocol類,而執行任務的jar包包括client類與protocol類。
相關推薦
Hbase Coprocessor(協處理器)的使用
本部落格記錄初次使用hbase coprocessor的過程。協處理器分兩種型別,系統協處理器可以全域性匯入region server上的所有資料表,表協處理器即是使用者可以指定一張表使用協處理器。協處理器框架為了更好支援其行為的靈活性,提供了兩個不同方面的外掛
使用HBase Coprocessor協處理器
原文: HBase的Coprocessor是模仿谷歌BigTable的Coprocessor模型實現的。 Coprocessor提供了一種機制可以讓開發者直接在RegionServer上執行自定義程式碼來管理資料。 首先必須要指明使用Coproc
HBase學習之三: hbase filter(過濾器)和coprocessor(協處理器)統計行數的簡單應用
2.通過coprocessor統計行數 關於協處理器的介紹和用法網上很多資料,可以自行查詢腦補,值得注意的是協處理器在hbase伺服器端執行,開發者可自定義處理器放置於伺服器端然後再客戶端呼叫,
HBase的協處理器
HBase中的協處理器有兩種:observer和endpoint Observer coprocessor:類似於RDBMS中的觸發器,它在特定事件(例如Get或Put)發生之前或之後執行你的程式碼。 Endpoint coprocessor:類似於RDBMS中的儲存過程,它允許你在Re
HBase建表高階屬性,hbase應用案例看行鍵設計,HBase和mapreduce結合,從Hbase中讀取資料、分析,寫入hdfs,從hdfs中讀取資料寫入Hbase,協處理器和二級索引
1. Hbase高階應用 1.1建表高階屬性 下面幾個shell 命令在hbase操作中可以起到很到的作用,且主要體現在建表的過程中,看下面幾個create 屬性 1、 BLOOMFILTER 預設是NONE 是否使用布隆過慮及使用何種方式 布隆
HBase協處理器
協處理器分兩種型別,系統協處理器可以全域性匯入region server上的所有資料表,表協處理器即是使用者可以指定一張表使用協處理器。協處理器框架為了更好支援其行為的靈活性,提供了兩個不同方面的外掛。一個是觀察者(observer),類似於關係資料庫的觸發器。另一個是終端(endpoint),
hbase協處理器與二級索引
一、協處理器—Coprocessor 1、 起源 Hbase 作為列族資料庫最經常被人詬病的特性包括:無法輕易建立“二級索引”,難以執 行求和、計數、排序等操作。比如,在舊版本的(<0.92)Hbase 中,統計資料表的總行數,需 要使用 Counte
HBase利用observer(協處理器)建立二級索引
一、協處理器—Coprocessor 1、 起源 Hbase 作為列族資料庫最經常被人詬病的特性包括:無法輕易建立“二級索引”,難以執 行求和、計數、排序等操作。比如,在舊版本的(<0.92)Hbase 中,統
HBase協處理器載入過程(1.2.x)
一、首先來看ObserverCoprocessor:1.觀察者協處理器的靜態載入的配置是在hbase-site.xml中配置如下屬性:通過hbase.coprocessor.region.classes 配置 RegionObservers 和 Endpoints.通過hb
關於HBase協處理器導致問題的研究
1 載入協處理器 1.1 將協處理器上傳到hdfs: hadoop fs -mkdir /hbasenew/usercoprocesser hadoop fs -ls /hbasenew/usercoprocesser hadoop fs -rm /hbasenew/us
使用協處理器將HBase資料索引到Elasticsearch叢集
HBaseCon 2013: Using Coprocessors to Index Columns in an Elasticsearch Cluster 使用協處理器將列資料索引到Elasticsearch叢集 總結來說,一般就是擴充套件RegionObs
HBase協處理器載入過程(1.2)
之前寫過HBase協處理器的一些原理和使用,基本上都是官方文件和官方的部落格翻過來的,知道了怎麼寫,怎麼用。 現在需要思考的一個問題是,自己寫的協處理器是怎麼載入成功並呼叫的。 一、首先來看ObserverCoprocessor: 1.觀察者協處理器的靜態載入的配置是在hb
HBase-6.hbase 協處理器
引入Hbase中的Coprocessor的原因HBase作為列族資料庫無法建立“二級索引”,難以執行求和、計數、排序等操作。為解決這些問題,HBase0.92 之後引入協處理器(Coprocessor),實現一些新特性,能夠輕易建立二次索引、複雜過濾器、以及訪問控制。參考: http://blog.csdn.
hbase協處理器簡介
本章要點 hbase協處理器有哪幾種類型 hbase協處理器如何部署 協處理器的主要介面,已經介面功能 自定義協處理器例項 協處理器型別 hbase協處理型別主要分為observer 和endpoint兩種型別。 observer
hbase協處理器--建立endpoint協處理器
下載hbase-source之後,裡面有相應的原始碼,可以找到RowCountEndpoint.java,ExampleProto.java等一系列程式碼。通過閱讀原始碼以及其中的註釋,我們可以知道,Hbase的很多的功能都是通過protobuf這個工具來生成程式碼,尤其是
HBase協處理器同步二級索引到Solr
一、 已知的問題和不足 在上一個版本中,實現了使用HBase的協處理器將HBase的二級索引同步到Solr中,但是仍舊有幾個缺陷:寫入Solr的Collection是寫死在程式碼裡面,且是唯一的。如果我們有一張表的資料希望將不同的欄位同步到Solr中該如何做呢?目前所有
HBase 協處理器 (二)
3.7 The RegionObserver Class 用於 region level, 注意,這個類的所有呼叫方法的第一個引數為 ObserverContext<RegionCoprocessorEnvironment> ctx8, 提供訪問 conte
HBase優化策略及協處理器
服務端優化 什麼導致HBase效能下降 Jvm記憶體分配與GC回收策略 HBase執行機制相關的配置分配不合理 表結構設計及使用者使用方式不合理 HBase資料儲存過程 HBase寫入時當memstore達到一定大小會flush到磁碟儲存成HFile,
HBase 協處理器統計行數
環境:cdh5.1.0 啟用協處理器方法1. 啟用協處理器 Aggregation(Enable Coprocessor Aggregation) 我們有兩個方法:1.啟動全域性aggregatio