1. 程式人生 > >Window版本ElasticSearch同步資料庫資料

Window版本ElasticSearch同步資料庫資料

我使用的ElasticSearch是2.3.3版本,同步資料庫使用外掛是elasticsearch-jdbc-2.3.3.0,這裡請注意,針對ElasticSearch版本需要使用對應的外掛.下面是外掛下載地址.
https://github.com/jprante/elasticsearch-jdbc

環境變數配置

安裝好ElasticSearch後,將下載的外掛放到你喜歡的任意盤中(不需要安裝).
然後配置環境變數,在系統變數中新增一個變數,名字自定義(路徑指向你的外掛地址)
環境變數配置
這個外掛有點坑的地方就是你同步資料庫的時候需要使用jdk1.8,然後同步完後可以將環境變數改成你原來的jdk版本。但是不能解除安裝,不然同步就會停止。

指令碼檔案編寫

安裝完JDK1.8後在任意盤建立一個資料夾,建立一個.sh字尾檔案,同步執行的就是這個shell指令碼檔案。內容如下(貼上的程式碼顯示不正常,就使用圖片了):
執行檔案程式碼
圖片橘黃色部分是你新增環境變數名字。最後一行的statefile.json檔案新增在這個執行檔案同一路徑下。程式碼內容如下:
{
“type” : “jdbc”,
“jdbc”: {
“schedule”: “0/20 0-59 0-23 ? * *”,
“elasticsearch.autodiscover”:true,
“elasticsearch.cluster”:”elasticsearch”,
“driver”:”com.mysql.jdbc.Driver”,
“url”:”jdbc:mysql:localhost:3306/topic_test2”,
“user”:”“,
“password”:”“,
“sql”:”select v.id as _id,v.id as id,v.name as name,unix_timestamp(v.createTime) as createTime from videoInfo v”,
“elasticsearch” : {
“host” : “10.0.1.2”,
“port” : 9300
},
“index” : “original”,
“type” : “original”
}
}
其中schedule指的是同步重新整理的時間間隔,可以指定多長時間同步一次。有關schedule的引數配置可以點選下面路徑檢視:

http://www.cnblogs.com/lihaiming93/p/6619124.html
elasticsearch.cluster 這個會自動查詢你的elasticsearch是否有叢集,我做測試是沒有搭叢集,可以在你安裝的elasticsearch2.3.3/config/elasticsearch.yml檔案配置,修改這個檔案,找到cluster.name,去掉前面的#號,將值改為elasticsearch。
elasticsearch配置修改

這裡建議在寫sql時把ES上的 _id與資料庫id對應。這樣方便搜尋對應的資料。

然後填寫你的資料庫基本資訊,我是用的mysql這裡寫sql語句的時間型別是個坑,最好可以轉成時間戳,這樣方便搜尋是對時間排序。當時我需要同步的表設計時時間型別定為了varchar,同步到ES中就認定成了字串(這裡可以測試一下如果資料庫是正常時間型別,同步過來是否可以正常排序)。

下面有關ES的配置就可以按照你自己的需求來填寫。index和type填寫後會自動為你建立,不需要你主動建立。到這裡基本上已經成功配置完了。

指令碼檔案執行

在window下執行shell檔案需要下載Git Bash。下載完成後執行.sh檔案。執行後會在.sh檔案路徑下產生一個日誌檔案,如果失敗可以在日誌中檢視問題。
到這裡基本已經完成了對資料庫的同步,這裡要記住,執行.sh檔案的視窗不能關閉,關閉就停止同步了(關閉後重新執行這個檔案就可以恢復同步)。
然後這個外掛坑點還有不能同步刪除資料庫資訊(資料庫刪除後,ES上不能刪除),能同步增改。

刪除ES上冗餘資料(java程式碼實現)

由於ES不能同步刪除這個坑,我自己寫了一個工具方法()用來刪除ES上存在但資料庫中卻刪除了的冗餘資料(可以自己弄個定時器什麼的每天刪除)。
首先如果你的同步的資料庫資料量比較大的話,在java中,如果使用set.size()方法的話,每次最多能搜尋出1萬條資料,這樣對較大資料量時就顯得不夠用。所以在這裡使用的是滾動搜尋。程式碼如下:
這個方法是設定每次滾動搜尋的數量,然後需要對id進行排序好與資料庫中資料做判斷。
public Map<String,Object> searchByScroll() {
Map<String,Object> map = new HashMap<String,Object>();
List<Integer> list = new ArrayList<Integer>();
String index = "original";
String type = "original";
// 搜尋條件
SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
searchRequestBuilder.setIndices(index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.addSort("id", SortOrder.ASC);
searchRequestBuilder.setSize(1000);//設定每次滾動搜尋數量
searchRequestBuilder.setScroll(new TimeValue(30000));//設定滾動搜尋有效時長
// 執行
SearchResponse searchResponse = searchRequestBuilder.get();
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
list.add(Integer.valueOf(searchHit.getSource().get("id") + ""));
}
map.put("list", list);
map.put("scrollId", scrollId);
return map;
}

根據上面方法傳過來的scrollId來獲取所有的資料

public List<Integer> searchByScrollId(Map<String,Object> map) {
        List<Integer> list = (List<Integer>) map.get("list");
        String scrollId = (String) map.get("scrollId");
        TimeValue timeValue = new TimeValue(10000);
        SearchScrollRequestBuilder searchScrollRequestBuilder;
        SearchResponse response;
        // 結果
        while (true) {
            searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);
            // 重新設定滾動時間
            searchScrollRequestBuilder.setScroll(timeValue);
            // 請求
            response = searchScrollRequestBuilder.get();
            // 每次返回下一個批次結果 直到沒有結果返回時停止 即hits陣列空時
            if (response.getHits().getHits().length == 0) {
                break;
            } // if
                // 這一批次結果
            SearchHit[] searchHits = response.getHits().getHits();
            System.out.println(searchHits.length);
            for (SearchHit searchHit : searchHits) {
                list.add(Integer.valueOf(searchHit.getSource().get("id") + ""));
            } // for
                // 只有最近的滾動ID才能被使用
            scrollId = response.getScrollId();
        } // while
        clearScroll(scrollId);
        return list;
    }

這個方法就是同步刪除的工具方法,需要ES和資料庫資料id排序一致,原理是id迴圈比較,id相同就繼續,不同就跳過。

public boolean dropCommon() throws UnknownHostException{
        ESUtil es = new ESUtil();
        List<String> original = new Infomation().originalInfo();//獲取資料庫所有資料id,需要根據id排序
        List<Integer> error = new ArrayList<Integer>();
        int size = 0;
        while(size==0){
            int index = 0;
            int original_index = 0;
            List<Integer> all = es.searchByScrollId(es.searchByScroll());//獲取ES所有資料id,需要根據id排序
            for(int i=0;i<all.size();i++){
                index++;
                if(all.get(i)==Integer.parseInt(original.get(original_index))){
                    original_index++;
                }else{
                    error.add(all.get(i));//這裡就是冗餘資料id
                }
            }
            System.out.println(error.size());
            break;
        }
        return false;
    }