1. 程式人生 > >關於elasticsearch與資料庫同步的探討

關於elasticsearch與資料庫同步的探討

前言:在最近的一個專案中,存在著es與Mysql同步的需求,當時就想著使用外掛來完成,但是在實際的操作中存在一些不可避免的問題。

    目前,僅存的外掛中,能夠滿足elasticsearch5.x版本同步更新的外掛只剩下logstash-input-jdbc(外掛的使用與原理:http://blog.csdn.net/yeyuma/article/details/50240595),而本人在實際操作的過程中發現了此款外掛存在以下缺點:

    0. 安裝logstash麻煩,需要配置一些列引數。

    1.更新時間頻率最低只能60s以上,不能達到秒級。

    2.同時也要修改資料庫。

    3.當資料庫不在同一主機時,更新時間不統一。


     當我把此套方案提交給經理的時候,被無情的拒絕了,無奈只能想想其他辦法,經過一段時間的思考,根據logstash外掛的原理,利用任務排程來替換外掛,從而可以避免原外掛的部分問題。

     方案內容:利用自增欄位來記錄最新的更新資料的原理,使用任務排程可以自定義更新頻率,實現elasticsearch與mysql同步更新。
 避免了安裝外掛的麻煩,同時也可以實現秒級更新,但是依然有修改資料庫這一缺點。

![建立一張表,來記錄更新的最新資料id](https://img-blog.csdn.net/20170621100908378?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcXFfMzY0ODkyNDE=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)

下面引用專案中的程式碼,作為補充:

       logger.info("同步開始 : "+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()).toString());
        Record record = recordMapper.findById(1l);
        List<CollectData> collectDatas =  collectDataMapper.findByEsID(record.getRecord());
        List<Long>
nums = new ArrayList(); for (int i = 0; i < collectDatas.size(); i++) { //拿到自增最大的id nums.add(collectDatas.get(i).getEs_id()); CollectData data = collectDatas.get(i); Map map = new HashMap(); map.put("author",data.getAuthor()); map
.put("authorId",data.getAuthorId()); map.put("burl",data.getUrl()); map.put("click",data.getClick()); map.put("type",data.getType()); map.put("content",data.getContent()); try { URL url = new URL(data.getUrl()); map.put("domain",url.getHost()); }catch (Exception e){ e.printStackTrace(); } map.put("forward",data.getForward()); map.put("id",data.getId()); map.put("match_keys",data.getMatch_keys()); try { Date date = data.getPublishDate(); if (null!=date){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String time = sdf.format(date); map.put("publishDate",time); } }catch (Exception e){ e.printStackTrace(); } map.put("repeatTotal",data.getRepeatTotal()); map.put("score",data.getScore()); map.put("source",data.getSource()); //處理關鍵字 List<Long> ids= dataSubjectMapper.getAllIdByUUID(data.getId()); if (ids.size()>0){ map.put("subjectId",ids.toString()); } map.put("title",data.getTitle()); map.put("url",data.getUrl()); map.put("zanTotal",data.getZanTotal()); elasticSearchClient.inserForIndex(map,"btext"); } logger.info("同步資料條數 : "+nums.size()); if (nums.size()>0) { Long max = Collections.max(nums); logger.info("當前同步到的自增ID:" +max); Record rnew = new Record(); rnew.setId(1l); rnew.setRecord(max); recordMapper.update(rnew); }

注:省略了任務排程的程式碼,讀者自行補充