monstache同步mongo資料到es並保證高可用
monstache同步mongo資料到es並保證高可用
需求 & 問題描述
- 我們需要將MongoDB的資料實時同步到Elasticsearch中(包括資料變更),在評估了AWS DMS和Monstache之後,暫定選擇Monstache外掛同步資料
什麼是Monstache?
-
Monstache 是Golang語言實現的基於MongoDB的oplog實現實時資料同步及訂閱的外掛,支援MongoDB與ES之間的資料同步。其中MongoDB需要搭建副本集。
實踐
-
monstache是通過配置檔案啟動的,配置引數比較豐富
monstache 啟動需要指定配置檔案並取名config.toml,如下
# connection settings # print detailed information including request traces #啟用除錯日誌,這項要放在最上面,否則日誌列印不到檔案 verbose = true # connect to MongoDB using the following URL # 指定mongo 連線地址,一定要搭建mongodb叢集 mongo-url = "mongodb://192.168.7.51:27021" #"mongodb://root:<your_mongodb_password>@dds-bp1aadcc629******.mongodb.rds.aliyuncs.com:3717" # connect to the Elasticsearch REST API at the following node URLs # 指定es 連線地址 elasticsearch-urls = ["http://localhost:9200"] # frequently required settings # if you need to seed an index from a collection and not just listen and sync changes events # you can copy entire collections or views from MongoDB to Elasticsearch # 要監聽的mongodb的集合格式是 庫名.集合名 direct-read-namespaces = ["mssiot_forum_merossbeta.f_posts"] # if you want to use MongoDB change streams instead of legacy oplog tailing use change-stream-namespaces # change streams require at least MongoDB API 3.6+ # if you have MongoDB 4+ you can listen for changes to an entire database or entire deployment # in this case you usually don't need regexes in your config to filter collections unless you target the deployment. # to listen to an entire db use only the database name. For a deployment use an empty string. #change-stream-namespaces = ["mydb.col"] # additional settings # if you don't want to listen for changes to all collections in MongoDB but only a few # e.g. only listen for inserts, updates, deletes, and drops from mydb.mycollection # this setting does not initiate a copy, it is only a filter on the change event listener #namespace-regex = '^mssiot_forum_merossbeta\.f_posts$' # compress requests to Elasticsearch #gzip = true # generate indexing statistics #stats = true # index statistics into Elasticsearch #index-stats = true # use the following PEM file for connections to MongoDB #mongo-pem-file = "/path/to/mongoCert.pem" # disable PEM validation #mongo-validate-pem-file = false # use the following user name for Elasticsearch basic auth elasticsearch-user = "elastic" # use the following password for Elasticsearch basic auth #elasticsearch-password = "<your_es_password>" # use 8 go routines concurrently pushing documents to Elasticsearch #monstache最多開幾個執行緒同步到es,預設為4 elasticsearch-max-conns = 8 # use the following PEM file to connections to Elasticsearch #elasticsearch-pem-file = "/path/to/elasticCert.pem" # validate connections to Elasticsearch #elastic-validate-pem-file = true # propogate dropped collections in MongoDB as index deletes in Elasticsearch #mongodb刪除集合或庫時是否同步刪除es中的索引 dropped-collections = false # propogate dropped databases in MongoDB as index deletes in Elasticsearch dropped-databases = false # do not start processing at the beginning of the MongoDB oplog # if you set the replay to true you may see version conflict messages # in the log if you had synced previously. This just means that you are replaying old docs which are already # in Elasticsearch with a newer version. Elasticsearch is preventing the old docs from overwriting new ones. #replay = false # resume processing from a timestamp saved in a previous run # 從上一個時間點恢復 resume = true # do not validate that progress timestamps have been saved #resume-write-unsafe = false # override the name under which resume state is saved #resume-name = "default" # use a custom resume strategy (tokens) instead of the default strategy (timestamps) # tokens work with MongoDB API 3.6+ while timestamps work only with MongoDB API 4.0+ resume-strategy = 0 # exclude documents whose namespace matches the following pattern #namespace-exclude-regex = '^mydb\.ignorecollection$' # turn on indexing of GridFS file content #index-files = true # turn on search result highlighting of GridFS content #file-highlighting = true # index GridFS files inserted into the following collections #file-namespaces = ["users.fs.files"] # enable clustering mode # 指定monstache叢集名,高可用模式中很重要 cluster-name = 'merossdev' # worker模式 #workers = ["Tom", "Dick", "Harry"] # do not exit after full-sync, rather continue tailing the oplog #exit-after-direct-reads = false namespace-regex = '^mssiot_forum_merossbeta\.(f_posts|\$cmd)$' [[mapping]] namespace = "mssiot_forum_merossbeta" index = "f_posts" #生產環境記錄日誌必不可少,monstache預設是輸出到標準輸出的,這裡指定它輸出到指定的日誌檔案(這個也是踩坑踩出來的哦!) #[logs] #info = "/var/monstache/log/info.log" #warn = "/var/monstache/log/wran.log" #error = "/var/monstache/log/error.log" #trace = "/var/monstache/log/trace.log"
啟動方式 monstache -cluster-name merossdev -f config.toml,monstache是編譯好的二進位制檔案,如下圖所示
-
現在往mongodb寫入一條資料,再去查詢es,如下圖所示
由此,經過實踐,MongoDB對文件的其他操作同理 ,都會同步到es
-
Monstache的高可用之普通模式和多worker模式,****配置檔案裡面的cluster-name需要開啟,cluster-name="你自定義monstache叢集名字"
1. 基於普通方式
原理: When cluster-name is given monstache will enter a high availablity mode. Processes with cluster name set to the same value will coordinate. Only one of the processes in a cluster will sync changes. The other processes will be in a paused state. If the process which is syncing changes goes down for some reason one of the processes in paused state will take control and start syncing. See the section
****執行命令:monstache -cluster-name merossdev -f config.toml在終端連續兩次執行該命令,便啟動了兩個monstache程序,其中一個程序在監聽同步狀態,另一個處於Pausing,如下圖所示
上圖為監聽狀態
上圖為pausing狀態
現在我們把正在監聽的程序殺掉,驗證一下處於pausing狀態的程序是否會切換為監聽狀態,如下圖所示
上圖驗證了處於pausing狀態的程序已經切換為監聽狀態
\2. 基於多worker的方式
原理:workers- You can run multiple monstache processes and distribute the work between them. First configure the names of all the workers in a shared config.toml file. You can run monstache in high availability mode by starting multiple processes with the same value for cluster-name. Each process will join a cluster which works together to ensure that a monstache process is always syncing to Elasticsearch. 意思是多個worker協同工作, 在相同叢集名下的所有worker都會同步資料,都不會處於pausing狀態。叢集名與worker名相同的程序,如果其中某一個程序處於監聽狀態,另一個會處於pausing狀 態,當處於監聽狀態的程序掛掉之後,同名的程序由pausing狀態升級為監聽狀態。你不能指定works列表之外的worker來啟動程序。 相關文件 https://rwynn.github.io/monstache-site/advanced/#high-availability
前提條件:需要在配置檔案指定workers : workers = ["Tom", "Dick", "Harry"]
執行命令:monstache -cluster-name HA -worker Tom -f config.toml
monstache -cluster-name HA -worker Dick -f config.toml
monstache -cluster-name HA -worker Harry -f config.toml
驗證:我們往MongoDB同時寫入10000條資料,monstache會hash所有worker,並把文件id交給某一個worker去執行,如下圖
現在mongodb裡面有10000條資料,es等待同步
啟動三個worker,我們發現每個worker都同步相對量的資料
通過查詢es,10000條資料已經同步完畢
Monstache的高可用之普通模式和多worker模式的比較
1 普通模式
優勢:部署相對簡單
劣勢:處理資料較慢,原因是普通模式就只有一個worker在工作,然後指定你想要的goroutine去消費資料(該配置可以彌補多worker的方式)
2 多worker模式
優勢:同步效率近乎實時,因為多worker同時工作,並且每一個worker還可以指定多個goroutine去消費資料,併發能力更高
劣勢:部署相對繁瑣
總結:由於普通模式和多worker模式在同步時間上其實相差不大,普通模式同步一萬資料只需要1.5秒的時間(已在本地驗證)且部署相對簡單,所以最終選擇普通模式
關於monstache的eks部署請參見:https://www.cnblogs.com/agopher/p/15704633.html
本文所引用的文件:
官方文件:https://rwynn.github.io/monstache-site/advanced/#high-availability
動手實踐文件:https://help.aliyun.com/document_detail/171650.html#title-8gf-qh2-3qj