mongo-connector原理及改造
這段時間因為專案的某個需求需要改造mongo-connector,改造開源產品首先要讀懂別人的程式碼,於是總結記錄一下自己的分析過程。這裡假設你對Solr有一定的瞭解。
mongo-connctor是一款用於同步MongoDB資料到其他系統元件,比如它能同步資料到Solr、ElasticSearch或者其他MongoDB叢集中去。它的實現原理是依據MongoDB的Replica Set複製模式,通過分析oplog日誌檔案達到最終的同步目的。安裝配置啟動過程可參考官方文件。這篇文章具體來分析一下mongo-connetor的執行過程是怎樣的。
oplog
首先需要了解oplog的格式,它的格式在不同版本的mongodb上有所區別,大致是:
PRIMARY> db.version() 2.2.2 PRIMARY> db.oplog.rs.findOne() { "ts" : Timestamp(1364186197000, 58), "h" : NumberLong("-7878220425718087654"), "v" : 2, "op" : "u", "ns" : "exaitem_gmsbatchtask.jdgmsbatchtask", "o2" : { "_id" : "83f09a98-6a41-497b-a988-99ba5399d296" }, "o" : { "_id" : "83f09a98-6a41-497b-a988-99ba5399d296", "status" : 2, "content" : "", "type" : 17, "business" : "832722", "optype" : 2, "addDate" : ISODate("2013-03-25T04:36:38.511Z"), "modifyDate" : ISODate("2013-03-25T04:36:39.131Z"), "source" : 5 }
- ts 由4個位元組的timestamp 和 4位元組的自增計數器表示。
- op:
"i": insert "u": update "d": delete "c": db cmd "db":聲明當前資料庫 (其中ns 被設定成為=>資料庫名稱+ '.') "n": no op,即空操作,其會定期執行以確保時效性 。
- ns:操作的namespace
- o:操作對應的document,即當前操作的內容(比如更新操作時要更新的的欄位和值,那些沒有被更新的欄位及對應的值也會在這裡面)
- o2:執行更新操作的條件,只有update才有該屬性。}
mongo-connector
mongo-connetor的核心目錄結構是:
├── mongo_connector │ ├── __init__.py │ ├── compat.py │ ├── config.txt │ ├── connector.py │ ├── constants.py │ ├── doc_managers │ │ ├── __init__.py │ │ ├── config.txt │ │ ├── doc_manager_simulator.py │ │ ├── elastic_doc_manager.py │ │ ├── formatters.py │ │ ├── mongo_doc_manager.py │ │ ├── schema.xml │ │ ├── solr_doc_manager.py │ │ └── solr_doc_manager.pyc │ ├── errors.py │ ├── locking_dict.py │ ├── oplog_manager.py │ └── util.py
程式的入口就是connector.py,main方法通過從命令列中接收引數資訊,引數資訊可以用mongo-conntor --help
檢視,根據引數資訊構建connector物件,connetor繼承Thread,具體的執行流程如下:
基本的執行流程理解之後,我有這個一個需求,同mongodb同步到solr中的document中的部分field是手動加上去的,比如:使用者的粉絲數量在mongodb中沒有儲存,而是放在redis中,此時有會通過其他方式把粉絲數同步到solr中去,同步之後,就會遇到一個問題,如果mongodb中的那條記錄有更新操作,比如:該紀錄修改了usename欄位,但是他用的是mongodb中的賦值操作,而不是修改器$set
,此時mongo-conntor會把這條記錄的所有欄位更新過去,相當於把對應solr中的那個document刪除,重新索引,這是粉絲數就沒有了。具體的細節可以通過檢視solr_doc_manager.py檔案來驗證。也可以看看我從官方fork的一份程式碼中檢視,這份程式碼做了些注視。
通過在apply_update方法中新增邏輯:
#solr中有的欄位但mongodb中沒有的欄位,繼續保留在solr中 for key in doc: if key not in update_spec: update_spec[key] = doc[key]
就可以解決該問題了。
關注公眾號「Python之禪」(id:vttalk)獲取最新文章