關於elasticsearch與資料庫同步的探討
阿新 • • 發佈:2019-01-07
前言:在最近的一個專案中,存在著es與Mysql同步的需求,當時就想著使用外掛來完成,但是在實際的操作中存在一些不可避免的問題。
目前,僅存的外掛中,能夠滿足elasticsearch5.x版本同步更新的外掛只剩下logstash-input-jdbc(外掛的使用與原理:http://blog.csdn.net/yeyuma/article/details/50240595),而本人在實際操作的過程中發現了此款外掛存在以下缺點: 0. 安裝logstash麻煩,需要配置一些列引數。 1.更新時間頻率最低只能60s以上,不能達到秒級。 2.同時也要修改資料庫。 3.當資料庫不在同一主機時,更新時間不統一。 當我把此套方案提交給經理的時候,被無情的拒絕了,無奈只能想想其他辦法,經過一段時間的思考,根據logstash外掛的原理,利用任務排程來替換外掛,從而可以避免原外掛的部分問題。 方案內容:利用自增欄位來記錄最新的更新資料的原理,使用任務排程可以自定義更新頻率,實現elasticsearch與mysql同步更新。 避免了安裝外掛的麻煩,同時也可以實現秒級更新,但是依然有修改資料庫這一缺點。 ![建立一張表,來記錄更新的最新資料id](https://img-blog.csdn.net/20170621100908378?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcXFfMzY0ODkyNDE=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
下面引用專案中的程式碼,作為補充:
logger.info("同步開始 : "+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()).toString());
Record record = recordMapper.findById(1l);
List<CollectData> collectDatas = collectDataMapper.findByEsID(record.getRecord());
List<Long> nums = new ArrayList();
for (int i = 0; i < collectDatas.size(); i++) {
//拿到自增最大的id
nums.add(collectDatas.get(i).getEs_id());
CollectData data = collectDatas.get(i);
Map map = new HashMap();
map.put("author",data.getAuthor());
map .put("authorId",data.getAuthorId());
map.put("burl",data.getUrl());
map.put("click",data.getClick());
map.put("type",data.getType());
map.put("content",data.getContent());
try {
URL url = new URL(data.getUrl());
map.put("domain",url.getHost());
}catch (Exception e){
e.printStackTrace();
}
map.put("forward",data.getForward());
map.put("id",data.getId());
map.put("match_keys",data.getMatch_keys());
try {
Date date = data.getPublishDate();
if (null!=date){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String time = sdf.format(date);
map.put("publishDate",time);
}
}catch (Exception e){
e.printStackTrace();
}
map.put("repeatTotal",data.getRepeatTotal());
map.put("score",data.getScore());
map.put("source",data.getSource());
//處理關鍵字
List<Long> ids= dataSubjectMapper.getAllIdByUUID(data.getId());
if (ids.size()>0){
map.put("subjectId",ids.toString());
}
map.put("title",data.getTitle());
map.put("url",data.getUrl());
map.put("zanTotal",data.getZanTotal());
elasticSearchClient.inserForIndex(map,"btext");
}
logger.info("同步資料條數 : "+nums.size());
if (nums.size()>0) {
Long max = Collections.max(nums);
logger.info("當前同步到的自增ID:" +max);
Record rnew = new Record();
rnew.setId(1l);
rnew.setRecord(max);
recordMapper.update(rnew);
}
注:省略了任務排程的程式碼,讀者自行補充