Elasticsearch資料全量匯入HBase,scroll的正確使用姿勢,HBase資料到Hive
1、程式碼
只貼出Es用scroll方式讀取資料以及批量寫入HBase的核心程式碼,其他工具類、方法,比如es、HBase配置、client、connection獲取就不貼了。
1-1、es獲取資料
package ipl.restapi.service.bigdata.es;
import ipl.restapi.util.EsOpenCloseUtils;
import ipl.restapi.util.EsPropertiesUtils;
import ipl.restapi.util.HbaseApiUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* <p>pakage: ipl.restapi.service.bigdata.es</p>
*
* descirption: es檢索某一所以全量資料,匯入HBase
*
* @author wanghai
* @version V1.0
* @since <pre>2018/8/15 下午9:03</pre>
*/
public class ReadFromEs {
private static final Logger LOGGER = LoggerFactory.getLogger("es");
private static final int SCROLL_SIZE = 10000;
private static final int HBASE_PUT_SIZE = 1000;
/**
* es資料匯入HBase
* @param tableName 資料匯入到HBase的那一張表
*/
public void putAllDataToHbase(String tableName) {
Map<String, Object> hashMap = EsPropertiesUtils.getConf();
TransportClient esClient = EsOpenCloseUtils.getInstance(hashMap);
// 建立查詢體
Map<String, Object> queryParams = new HashMap<>();
queryParams.put("index", "papper_little");
queryParams.put("type", "automatic");
// index
String indexName = queryParams.get("index").toString();
// type
String type = queryParams.get("type").toString();
scrollDataToHbase(esClient, indexName, type, tableName);
}
/**
* 允許我們做一個初始搜尋並且持續批量從Elasticsearch里拉取另一部分資料,結果直到沒有結果剩下,類似於資料庫的遊標,
* 為了避免資料量過大,每次從上次scroll的位置繼續獲取資料獲取的資料寫入HBase。
*
* @param esClient es客戶端
* @param indexName 索引名
* @param typeName es type
* @param tableName 資料匯入到HBase的那一張表
*/
public void scrollDataToHbase(Client esClient, String indexName, String typeName, String tableName) {
// TODO:通訊過程中getScrollId丟失怎麼辦?比如說kafka,就有多種機制驗證處理請求者的請求資料的偏移量
int baseRowNum = 0;
SearchResponse scrollResp = esClient.prepareSearch(indexName)
.setTypes(typeName)
.setScroll(new TimeValue(300000))
// 每次返回10000條資料(如果夠)
.setSize(SCROLL_SIZE).get();
// arraylist放1000個map,一個map為一條論文資料,map中存放key-value對,是存入HBase的列名和值
ArrayList<Map<String, Object>> hit1000List = new ArrayList<>(1024);
Connection connection = HbaseApiUtils.getConnection();
do {
SearchHits searchHits = scrollResp.getHits();
long num = searchHits.getHits().length;
System.out.println("數量:" + num);
for (int i = 0; i < num; ) {
hit1000List.add(searchHits.getAt(i).getSourceAsMap());
i++;
if (i % HBASE_PUT_SIZE == 0) {
baseRowNum++;
try {
// TODO:網路不好,1000條可以考慮非同步讀寫
HbaseApiUtils.putListByMap(connection, tableName, hit1000List, baseRowNum, "pappers_info", "papper_", HBASE_PUT_SIZE);
hit1000List.clear();
} catch (IOException e) {
LOGGER.error("es批量插入hbase異常-1!");
LOGGER.error(e.getMessage());
}
}
}
System.out.println("目前完成 " + baseRowNum * HBASE_PUT_SIZE);
// 處理不足1000的資料
if (!hit1000List.isEmpty()) {
baseRowNum++; // 避免不足1000的資料覆蓋之前的
try {
HbaseApiUtils.putListByMap(connection, tableName, hit1000List, baseRowNum, "pappers_info", "papper_", HBASE_PUT_SIZE);
hit1000List.clear();
System.out.println("down!");
} catch (IOException e) {
LOGGER.error("es批量插入hbase異常-2!");
LOGGER.error(e.getMessage());
}
}
scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
}
while (scrollResp.getHits().getHits().length != 0);
}
}
1-2、批量寫入HBase
/**
* 批量匯入資料到HBase table
*
* @param tableName 要匯入資料的 表名
* @param arraylist 封裝資料的arraylist
* @param baseRowNum 傳入的是第幾輪的資料
* @param filedFamily 列族
* @param keyPrefix rowKey字首
* @param putSize 每一輪傳入的資料量(最後一次也許除外)
* @throws IOException
*/
public static void putListByMap(Connection connection, String tableName, ArrayList<Map<String, Object>> arraylist, int baseRowNum, String filedFamily, String keyPrefix, int putSize) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
List<Put> puts = new ArrayList<>();
int rowNum = 0;
baseRowNum = (baseRowNum - 1) * putSize;
for (Map<String, Object> hitMap : arraylist) {
rowNum++;
// 傳輸時,資料需要序列化——Bytes.toBytes
for (Map.Entry<String, Object> entry : hitMap.entrySet()) {
Put put = new Put(Bytes.toBytes(keyPrefix + (rowNum + baseRowNum)));
put.addColumn(Bytes.toBytes(filedFamily), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue().toString()));
puts.add(put);
}
}
table.put(puts);
LOGGER.info("表:{}已使用putListByMap方法批量更新!!!{}", tableName, baseRowNum);
}
public static Connection getConnection() {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "your_ipaddress");
config.set("hbase.zookeeper.property.clientPort", "2181");
// 建立一個連線到叢集的connection
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(config);
} catch (IOException e) {
e.printStackTrace();
}
return connection;
}
2、思路
2-1、es中取資料
- es取資料的方式很多,但是這裡是取全量資料,所以不需要檢索詞,定義index、type即可。
- 也有其他不少部落格寫es獲取全量資料,但是大多有問題,大致可以分為兩類問題:
1、資料量不夠大,可以一次性讀進記憶體;
2、程式碼有bug,獲取下一部分資料時,有丟失或者重複,或者說簡單使用setFrom,並不是真正的部分讀取。
對於資料不能全部讀進記憶體的情況,我們可以使用es中的scroll進行“下標選擇“。允許我們做一個初始搜尋並且持續批量從Elasticsearch里拉取另一部分資料,結果直到沒有結果剩下,類似於資料庫的遊標,為了避免資料量過大,每次從上次scroll的位置繼續獲取資料。
scroll 並不適合用來做實時搜尋,而更適用於後臺批處理任務。
scroll獲取資料大致可分為初始化和遍歷兩個階段,初始化時將所有符合搜尋條件的搜尋結果快取起來,可以想象成快照,然後更新scroll_id遍歷,從這個快照裡繼續取資料。
核心是
scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();while (scrollResp.getHits().getHits().length != 0);
2-2、資料封裝
es獲取的資料封裝進Map,key-value剛好作為column-value。由於HBase稀疏列的特性,各條記錄列數不同也是ok的。
Map資料封裝進Arraylist,批量put進HBase。
3、mac 修改hosts檔案,老是自動恢復原樣
冥思不得解,發現每次都是連線了學校的VPN後發生的。於是查詢瞭解到/private/etc/pulse-hosts.bak,/etc/hosts在pulse
啟動後或者使用中被重置
解決辦法:
在/private/etc/pulse-hosts.bak中填寫hosts即可(類似白名單的概念吧),當然,加入白名單後,/etc/hosts中還是需要填寫的
4、HBase資料到Hive【整合】
5、TODO
1、通訊過程中getScrollId丟失怎麼辦?比如說kafka,就有多種機制驗證處理請求者的請求資料的偏移量
2、優化:非同步,讀es與資料存HBase執行緒分開,以減少處理時間。