HBase協處理器同步二級索引到Solr
阿新 • • 發佈:2019-01-30
一、 已知的問題和不足
在上一個版本中,實現了使用HBase的協處理器將HBase的二級索引同步到Solr中,但是仍舊有幾個缺陷:
- 寫入Solr的Collection是寫死在程式碼裡面,且是唯一的。如果我們有一張表的資料希望將不同的欄位同步到Solr中該如何做呢?
- 目前所有配置相關資訊都是寫死到了程式碼中的,是否可以新增外部配置檔案。
- 原來的方法是每次都需要編譯新的Jar檔案單獨執行,能否將所有的同步使用一段通用的程式碼完成?
二、解決思路
針對上面的三個主要問題,我們一一解決
- 通常一張表會對應多個SolrCollection以及不同的Column。我們可以使用
Map[表名->List[(Collection1,List[Columns]),(Collection2,List[Columns])...]]
- 通過Typesafe Config讀取外部配置檔案,達到所有資訊可配的目的。
- 所有的資料都只有Put和Delete,只要我們攔截到具體的訊息之後判斷當前的表名,然後根據問題一中的Collection和Column即可寫入對應的SolrServer。在協處理器中獲取表名的是
e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString()
其中e是ObserverContext
三、程式碼
3.1 讀取config檔案內容
使用typesafe的config元件讀取morphlines.conf檔案,將內容轉換為 Map<String,List<HBaseIndexerMappin>>
publicclassConfigManager{
privatestaticSourceConfig sourceConfig =newSourceConfig();
publicstaticConfig config;
static{
sourceConfig.setConfigFiles("morphlines.conf");
config = sourceConfig.getConfig();
}
publicstaticMap<String,List<HBaseIndexerMappin>> getHBaseIndexerMappin(){
Map<String,List<HBaseIndexerMappin>> mappin =newHashMap<String,List<HBaseIndexerMappin>>();
Config mappinConf = config.getConfig("Mappin");
List<String> tables = mappinConf.getStringList("HBaseTables");
for(String table :tables){
List<Config> confList =(List<Config>) mappinConf.getConfigList(table);
List<HBaseIndexerMappin> maps =newLinkedList<HBaseIndexerMappin>();
for(Config tmp :confList){
HBaseIndexerMappin map =newHBaseIndexerMappin();
map.solrConnetion = tmp.getString("SolrCollection");
map.columns = tmp.getStringList("Columns");
maps.add(map);
}
mappin.put(table,maps);
}
return mappin;
}
}
3.2 封裝SolrServer的獲取方式
因為目前我使用的環境是Solr和HBase公用的同一套Zookeeper,因此我們完全可以藉助HBase的Zookeeper資訊。HBase的協處理器是執行在HBase的環境中的,自然可以通過HBase的Configuration獲取當前的Zookeeper節點和埠,然後輕鬆的獲取到Solr的地址。
publicclassSolrServerManagerimplementsLogManager{
staticConfiguration conf =HBaseConfiguration.create();
publicstaticStringZKHost= conf.get("hbase.zookeeper.quorum","bqdpm1,bqdpm2,bqdps2");
publicstaticStringZKPort= conf.get("hbase.zookeeper.property.clientPort","2181");
publicstaticStringSolrUrl=ZKHost+":"+ZKPort+"/"+"solr";
publicstaticint zkClientTimeout =1800000;// 心跳
publicstaticint zkConnectTimeout =1800000;// 連線時間
publicstaticCloudSolrServer create(String defaultCollection){
log.info("Create SolrCloudeServer .This collection is "+ defaultCollection);
CloudSolrServer solrServer =newCloudSolrServer(SolrUrl);
solrServer.setDefaultCollection(defaultCollection);
solrServer.setZkClientTimeout(zkClientTimeout);
solrServer.setZkConnectTimeout(zkConnectTimeout);
return solrServer;
}
}
3.3 編寫提交資料到Solr的程式碼
理想狀態下,我們時時刻刻都需要提交資料到Solr中,但是事實上我們資料寫入的時間是比較分散的,可能集中再每一天的某幾個時間點。因此我們必須保證在高併發下能達到一定資料量自動提交,在低併發的情況下能隔一段時間寫入一次。只有兩種機制並存的情況下才能保證資料能即時寫入。
publicclassSolrCommitTimerextendsTimerTaskimplementsLogManager{
publicMap<String,List<SolrInputDocument>> putCache =newHashMap<String,List<SolrInputDocument>>();//Collection名字->更新(插入)操作快取
publicMap<String,List<String>> deleteCache =newHashMap<String,List<String>>();//Collection名字->刪除操作快取
Map<String,CloudSolrServer> solrServers =newHashMap<String,CloudSolrServer>();//Collection名字->SolrServers
int maxCache =ConfigManager.config.getInt("MaxCommitSize");
// 任何時候,保證只能有一個執行緒在提交索引,並清空集合
finalstaticSemaphore semp =newSemaphore(1);
//新增Collection和SolrServer
publicvoid addCollecttion(String collection,CloudSolrServer server){
this.solrServers.put(collection,server);
}
//往Solr新增(更新)資料
publicUpdateResponse put(CloudSolrServer server,SolrInputDocument doc)throwsIOException,SolrServerException{
server.add(doc);
return server.commit(false,false);
}
//往Solr新增(更新)資料
publicUpdateResponse put(CloudSolrServer server,List<SolrInputDocument> docs)throwsIOException,SolrServerException{
server.add(docs);
return server.commit(false,false);
}
//根據ID刪除Solr資料
publicUpdateResponsedelete(CloudSolrServer server,String rowkey)throwsIOException,SolrServerException{
server.deleteById(rowkey);
return server.commit(false,false);
}
//根據ID刪除Solr資料
publicUpdateResponsedelete(CloudSolrServer server,List<String> rowkeys)throwsIOException,SolrServerException{
server.deleteById(rowkeys);
return server.commit(false,false);
}
//將doc新增到快取
publicvoid addPutDocToCache(String collection,SolrInputDocument doc)throwsIOException,SolrServerException,InterruptedException{
semp.acquire();
log.debug("addPutDocToCache:"+"collection="+ collection +"data="+ doc.toString());
if(!putCache.containsKey(collection)){