Window版本ElasticSearch同步資料庫資料
我使用的ElasticSearch是2.3.3版本,同步資料庫使用外掛是elasticsearch-jdbc-2.3.3.0,這裡請注意,針對ElasticSearch版本需要使用對應的外掛.下面是外掛下載地址.
https://github.com/jprante/elasticsearch-jdbc
環境變數配置
安裝好ElasticSearch後,將下載的外掛放到你喜歡的任意盤中(不需要安裝).
然後配置環境變數,在系統變數中新增一個變數,名字自定義(路徑指向你的外掛地址)
這個外掛有點坑的地方就是你同步資料庫的時候需要使用jdk1.8,然後同步完後可以將環境變數改成你原來的jdk版本。但是不能解除安裝,不然同步就會停止。
指令碼檔案編寫
安裝完JDK1.8後在任意盤建立一個資料夾,建立一個.sh字尾檔案,同步執行的就是這個shell指令碼檔案。內容如下(貼上的程式碼顯示不正常,就使用圖片了):
圖片橘黃色部分是你新增環境變數名字。最後一行的statefile.json檔案新增在這個執行檔案同一路徑下。程式碼內容如下:
{
“type” : “jdbc”,
“jdbc”: {
“schedule”: “0/20 0-59 0-23 ? * *”,
“elasticsearch.autodiscover”:true,
“elasticsearch.cluster”:”elasticsearch”,
“driver”:”com.mysql.jdbc.Driver”,
“url”:”jdbc:mysql:localhost:3306/topic_test2”,
“user”:”“,
“password”:”“,
“sql”:”select v.id as _id,v.id as id,v.name as name,unix_timestamp(v.createTime) as createTime from videoInfo v”,
“elasticsearch” : {
“host” : “10.0.1.2”,
“port” : 9300
},
“index” : “original”,
“type” : “original”
}
}
其中schedule指的是同步重新整理的時間間隔,可以指定多長時間同步一次。有關schedule的引數配置可以點選下面路徑檢視:
elasticsearch.cluster 這個會自動查詢你的elasticsearch是否有叢集,我做測試是沒有搭叢集,可以在你安裝的elasticsearch2.3.3/config/elasticsearch.yml檔案配置,修改這個檔案,找到cluster.name,去掉前面的#號,將值改為elasticsearch。
這裡建議在寫sql時把ES上的 _id與資料庫id對應。這樣方便搜尋對應的資料。
然後填寫你的資料庫基本資訊,我是用的mysql這裡寫sql語句的時間型別是個坑,最好可以轉成時間戳,這樣方便搜尋是對時間排序。當時我需要同步的表設計時時間型別定為了varchar,同步到ES中就認定成了字串(這裡可以測試一下如果資料庫是正常時間型別,同步過來是否可以正常排序)。
下面有關ES的配置就可以按照你自己的需求來填寫。index和type填寫後會自動為你建立,不需要你主動建立。到這裡基本上已經成功配置完了。
指令碼檔案執行
在window下執行shell檔案需要下載Git Bash。下載完成後執行.sh檔案。執行後會在.sh檔案路徑下產生一個日誌檔案,如果失敗可以在日誌中檢視問題。
到這裡基本已經完成了對資料庫的同步,這裡要記住,執行.sh檔案的視窗不能關閉,關閉就停止同步了(關閉後重新執行這個檔案就可以恢復同步)。
然後這個外掛坑點還有不能同步刪除資料庫資訊(資料庫刪除後,ES上不能刪除),能同步增改。
刪除ES上冗餘資料(java程式碼實現)
由於ES不能同步刪除這個坑,我自己寫了一個工具方法()用來刪除ES上存在但資料庫中卻刪除了的冗餘資料(可以自己弄個定時器什麼的每天刪除)。
首先如果你的同步的資料庫資料量比較大的話,在java中,如果使用set.size()方法的話,每次最多能搜尋出1萬條資料,這樣對較大資料量時就顯得不夠用。所以在這裡使用的是滾動搜尋。程式碼如下:
這個方法是設定每次滾動搜尋的數量,然後需要對id進行排序好與資料庫中資料做判斷。
public Map<String,Object> searchByScroll() {
Map<String,Object> map = new HashMap<String,Object>();
List<Integer> list = new ArrayList<Integer>();
String index = "original";
String type = "original";
// 搜尋條件
SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
searchRequestBuilder.setIndices(index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.addSort("id", SortOrder.ASC);
searchRequestBuilder.setSize(1000);//設定每次滾動搜尋數量
searchRequestBuilder.setScroll(new TimeValue(30000));//設定滾動搜尋有效時長
// 執行
SearchResponse searchResponse = searchRequestBuilder.get();
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
list.add(Integer.valueOf(searchHit.getSource().get("id") + ""));
}
map.put("list", list);
map.put("scrollId", scrollId);
return map;
}
根據上面方法傳過來的scrollId來獲取所有的資料
public List<Integer> searchByScrollId(Map<String,Object> map) {
List<Integer> list = (List<Integer>) map.get("list");
String scrollId = (String) map.get("scrollId");
TimeValue timeValue = new TimeValue(10000);
SearchScrollRequestBuilder searchScrollRequestBuilder;
SearchResponse response;
// 結果
while (true) {
searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);
// 重新設定滾動時間
searchScrollRequestBuilder.setScroll(timeValue);
// 請求
response = searchScrollRequestBuilder.get();
// 每次返回下一個批次結果 直到沒有結果返回時停止 即hits陣列空時
if (response.getHits().getHits().length == 0) {
break;
} // if
// 這一批次結果
SearchHit[] searchHits = response.getHits().getHits();
System.out.println(searchHits.length);
for (SearchHit searchHit : searchHits) {
list.add(Integer.valueOf(searchHit.getSource().get("id") + ""));
} // for
// 只有最近的滾動ID才能被使用
scrollId = response.getScrollId();
} // while
clearScroll(scrollId);
return list;
}
這個方法就是同步刪除的工具方法,需要ES和資料庫資料id排序一致,原理是id迴圈比較,id相同就繼續,不同就跳過。
public boolean dropCommon() throws UnknownHostException{
ESUtil es = new ESUtil();
List<String> original = new Infomation().originalInfo();//獲取資料庫所有資料id,需要根據id排序
List<Integer> error = new ArrayList<Integer>();
int size = 0;
while(size==0){
int index = 0;
int original_index = 0;
List<Integer> all = es.searchByScrollId(es.searchByScroll());//獲取ES所有資料id,需要根據id排序
for(int i=0;i<all.size();i++){
index++;
if(all.get(i)==Integer.parseInt(original.get(original_index))){
original_index++;
}else{
error.add(all.get(i));//這裡就是冗餘資料id
}
}
System.out.println(error.size());
break;
}
return false;
}