1. 程式人生 > 其它 >monstache同步mongo資料到es並保證高可用

monstache同步mongo資料到es並保證高可用

monstache同步mongo資料到es並保證高可用

需求 & 問題描述

  • 我們需要將MongoDB的資料實時同步到Elasticsearch中(包括資料變更),在評估了AWS DMS和Monstache之後,暫定選擇Monstache外掛同步資料

什麼是Monstache?

實踐

  • monstache是通過配置檔案啟動的,配置引數比較豐富

    monstache 啟動需要指定配置檔案並取名config.toml,如下

    # connection settings
    # print detailed information including request traces
    #啟用除錯日誌,這項要放在最上面,否則日誌列印不到檔案
    verbose = true
    # connect to MongoDB using the following URL
    # 指定mongo 連線地址,一定要搭建mongodb叢集
    mongo-url = "mongodb://192.168.7.51:27021"
    #"mongodb://root:<your_mongodb_password>@dds-bp1aadcc629******.mongodb.rds.aliyuncs.com:3717"
    # connect to the Elasticsearch REST API at the following node URLs
    # 指定es 連線地址
    elasticsearch-urls = ["http://localhost:9200"]
    
    # frequently required settings
    # if you need to seed an index from a collection and not just listen and sync changes events
    # you can copy entire collections or views from MongoDB to Elasticsearch
    # 要監聽的mongodb的集合格式是 庫名.集合名
    direct-read-namespaces = ["mssiot_forum_merossbeta.f_posts"]
    
    # if you want to use MongoDB change streams instead of legacy oplog tailing use change-stream-namespaces
    # change streams require at least MongoDB API 3.6+
    # if you have MongoDB 4+ you can listen for changes to an entire database or entire deployment
    # in this case you usually don't need regexes in your config to filter collections unless you target the deployment.
    # to listen to an entire db use only the database name.  For a deployment use an empty string.
    #change-stream-namespaces = ["mydb.col"]
    
    # additional settings
    
    # if you don't want to listen for changes to all collections in MongoDB but only a few
    # e.g. only listen for inserts, updates, deletes, and drops from mydb.mycollection
    # this setting does not initiate a copy, it is only a filter on the change event listener
    #namespace-regex = '^mssiot_forum_merossbeta\.f_posts$'
    # compress requests to Elasticsearch
    #gzip = true
    # generate indexing statistics
    #stats = true
    # index statistics into Elasticsearch
    #index-stats = true
    # use the following PEM file for connections to MongoDB
    #mongo-pem-file = "/path/to/mongoCert.pem"
    # disable PEM validation
    #mongo-validate-pem-file = false
    # use the following user name for Elasticsearch basic auth
    elasticsearch-user = "elastic"
    # use the following password for Elasticsearch basic auth
    #elasticsearch-password = "<your_es_password>"
    # use 8 go routines concurrently pushing documents to Elasticsearch
    #monstache最多開幾個執行緒同步到es,預設為4
    elasticsearch-max-conns = 8
    # use the following PEM file to connections to Elasticsearch
    #elasticsearch-pem-file = "/path/to/elasticCert.pem"
    # validate connections to Elasticsearch
    #elastic-validate-pem-file = true
    # propogate dropped collections in MongoDB as index deletes in Elasticsearch
    #mongodb刪除集合或庫時是否同步刪除es中的索引
    dropped-collections = false
    # propogate dropped databases in MongoDB as index deletes in Elasticsearch
    dropped-databases = false
    # do not start processing at the beginning of the MongoDB oplog
    # if you set the replay to true you may see version conflict messages
    # in the log if you had synced previously. This just means that you are replaying old docs which are already
    # in Elasticsearch with a newer version. Elasticsearch is preventing the old docs from overwriting new ones.
    #replay = false
    # resume processing from a timestamp saved in a previous run
    # 從上一個時間點恢復
    resume = true
    # do not validate that progress timestamps have been saved
    #resume-write-unsafe = false
    # override the name under which resume state is saved
    #resume-name = "default"
    # use a custom resume strategy (tokens) instead of the default strategy (timestamps)
    # tokens work with MongoDB API 3.6+ while timestamps work only with MongoDB API 4.0+
    resume-strategy = 0
    # exclude documents whose namespace matches the following pattern
    #namespace-exclude-regex = '^mydb\.ignorecollection$'
    # turn on indexing of GridFS file content
    #index-files = true
    # turn on search result highlighting of GridFS content
    #file-highlighting = true
    # index GridFS files inserted into the following collections
    #file-namespaces = ["users.fs.files"]
    # enable clustering mode
    # 指定monstache叢集名,高可用模式中很重要
    cluster-name = 'merossdev'
    # worker模式
    #workers = ["Tom", "Dick", "Harry"]
    # do not exit after full-sync, rather continue tailing the oplog
    #exit-after-direct-reads = false
    namespace-regex = '^mssiot_forum_merossbeta\.(f_posts|\$cmd)$'
    
    [[mapping]]
    namespace = "mssiot_forum_merossbeta"
    index = "f_posts"
    
    #生產環境記錄日誌必不可少,monstache預設是輸出到標準輸出的,這裡指定它輸出到指定的日誌檔案(這個也是踩坑踩出來的哦!)
    #[logs]
    #info = "/var/monstache/log/info.log"
    #warn = "/var/monstache/log/wran.log"
    #error = "/var/monstache/log/error.log"
    #trace = "/var/monstache/log/trace.log"
    

    啟動方式 monstache -cluster-name merossdev -f config.toml,monstache是編譯好的二進位制檔案,如下圖所示

  • 現在往mongodb寫入一條資料,再去查詢es,如下圖所示



​ 由此,經過實踐,MongoDB對文件的其他操作同理 ,都會同步到es

  • Monstache的高可用之普通模式和多worker模式,****配置檔案裡面的cluster-name需要開啟,cluster-name="你自定義monstache叢集名字"

    1. 基於普通方式

    原理: When cluster-name is given monstache will enter a high availablity mode. Processes with cluster name set to the same value will coordinate. Only one of the processes in a cluster will sync changes. The other processes will be in a paused state. If the process which is syncing changes goes down for some reason one of the processes in paused state will take control and start syncing. See the section

    high availability for more information. 意思是在一個叢集裡面,只有一個程序會同步資料,其他程序處於Pausing狀態,如果同步資料程序掛掉,其他的某一個Pausing狀態程序會升級為監聽狀態 相關文件https://rwynn.github.io/monstache-site/config/
    ****執行命令
    :monstache -cluster-name merossdev -f config.toml

    在終端連續兩次執行該命令,便啟動了兩個monstache程序,其中一個程序在監聽同步狀態,另一個處於Pausing,如下圖所示

    ​ 上圖為監聽狀態

    ​ 上圖為pausing狀態

    現在我們把正在監聽的程序殺掉,驗證一下處於pausing狀態的程序是否會切換為監聽狀態,如下圖所示

    ​ 上圖驗證了處於pausing狀態的程序已經切換為監聽狀態

    \2. 基於多worker的方式

    原理:workers- You can run multiple monstache processes and distribute the work between them. First configure the names of all the workers in a shared config.toml file. You can run monstache in high availability mode by starting multiple processes with the same value for cluster-name. Each process will join a cluster which works together to ensure that a monstache process is always syncing to Elasticsearch. 意思是多個worker協同工作, 在相同叢集名下的所有worker都會同步資料,都不會處於pausing狀態。叢集名與worker名相同的程序,如果其中某一個程序處於監聽狀態,另一個會處於pausing狀 態,當處於監聽狀態的程序掛掉之後,同名的程序由pausing狀態升級為監聽狀態。你不能指定works列表之外的worker來啟動程序。 相關文件 https://rwynn.github.io/monstache-site/advanced/#high-availability

    前提條件:需要在配置檔案指定workers : workers = ["Tom", "Dick", "Harry"]

    執行命令:monstache -cluster-name HA -worker Tom -f config.toml

    ​ monstache -cluster-name HA -worker Dick -f config.toml

    ​ monstache -cluster-name HA -worker Harry -f config.toml

驗證:我們往MongoDB同時寫入10000條資料,monstache會hash所有worker,並把文件id交給某一個worker去執行,如下圖

​ 現在mongodb裡面有10000條資料,es等待同步


​ 啟動三個worker,我們發現每個worker都同步相對量的資料

​ 通過查詢es,10000條資料已經同步完畢

Monstache的高可用之普通模式和多worker模式的比較

1 普通模式

優勢:部署相對簡單

劣勢:處理資料較慢,原因是普通模式就只有一個worker在工作,然後指定你想要的goroutine去消費資料(該配置可以彌補多worker的方式)

2 多worker模式

優勢:同步效率近乎實時,因為多worker同時工作,並且每一個worker還可以指定多個goroutine去消費資料,併發能力更高

劣勢:部署相對繁瑣

總結:由於普通模式和多worker模式在同步時間上其實相差不大,普通模式同步一萬資料只需要1.5秒的時間(已在本地驗證)且部署相對簡單,所以最終選擇普通模式

關於monstache的eks部署請參見:https://www.cnblogs.com/agopher/p/15704633.html

本文所引用的文件:

官方文件:https://rwynn.github.io/monstache-site/advanced/#high-availability

動手實踐文件:https://help.aliyun.com/document_detail/171650.html#title-8gf-qh2-3qj