實時同步MongoDB Oplog開發指南
轉載請註明joymufeng,歡迎訪問PlayScala社區(http://www.playscala.cn/)
Capped Collections
MongoDB有一種特殊的Collection叫Capped collections,它的插入速度非常快,基本和磁盤的寫入速度差不多,並且支持按照插入順序高效的查詢操作。Capped collections的大小是固定的,它的工作方式很像環形緩沖器(circular buffers), 當剩余空間不足時,會覆蓋最先插入的數據。
Capped collections的特點是高效插入和檢索,所以最好不要在Capped collections上添加額外的索引,否則會影響插入速度。Capped collections可以用於以下場景:
- 存儲日誌: Capped collections的first-in-first-out特性剛好滿足日誌事件的存儲順序;
- 緩存小量數據:因為緩存的特點是讀多寫少,所以可以適當使用索引提高讀取速度。
Capped collections的使用限制:
- 如果更新數據,你需要為之創建索引以防止collection scan;
- 更新數據時,文檔的大小不能改變。比如說name屬性為‘abc‘,則只能修改成3個字符的字符串,否則操作將會失敗;
- 數據不允許刪除,如果非刪除不可,只能drop collection
- 不支持sharding
- 默認只支持按自然順序(即插入順序)返回結果
Capped collections可以使用$natural操作符按插入順序的正序或反序返回結果:
db[‘oplog.rs‘].find({}).sort({$natural: -1})
Oplog
Oplog是一種特殊的Capped collections,特殊之處在於它是系統級Collection,記錄了數據庫的所有操作,集群之間依靠Oplog進行數據同步。Oplog的全名是local.oplog.rs,位於local數據下。由於local數據不允許創建用戶,如果要訪問Oplog需要借助其它數據庫的用戶,並且賦予該用戶訪問local數據庫的權限,例如:
db.createUser({
user: "play-community",
pwd: "******",
"roles" : [
{
"role" : "readWrite",
"db" : "play-community"
},
{
"role" : "read",
"db" : "local"
}
]
})
Oplog記錄的操作記錄是冪等的(idempotent),這意味著你可以多次執行這些操作而不會導致數據丟失或不一致。例如對於$inc操作,Oplog會自動將其轉換為$set操作,例如原始數據如下:
{
"_id" : "0",
"count" : 1.0
}
執行如下$inc操作:
db.test.update({_id: "0"}, {$inc: {count: 1}})
Oplog記錄的日誌為:
{
"ts" : Timestamp(1503110518, 1),
"t" : NumberLong(8),
"h" : NumberLong(-3967772133090765679),
"v" : NumberInt(2),
"op" : "u",
"ns" : "play-community.test",
"o2" : {
"_id" : "0"
},
"o" : {
"$set" : {
"count" : 2.0
}
}
}
這種轉換可以保證Oplog的冪等性。另外Oplog為了保證插入性能,不允許額外創建索引。
Timestamps格式
MongoDB有一種特殊的時間格式Timestamps,僅用於內部使用,例如上面Oplog記錄:
Timestamp(1503110518, 1)
Timestamps長度為64位:
- 前32位是time_t值,表示從epoch時間至今的秒數
- 後32位是ordinal值,該值是一個順序增長的序數,表示某一秒內的第幾次操作
開始同步Oplog
在開始同步Oplog之前,我們需要註意以下幾點:
- 由於Oplog不使用索引,所以初始查詢代價可能很大
- 當Oplog數據量很大時,可以保存ts,系統重啟時利用該ts可以減少首次查詢開銷
- oplogReplay標誌可以顯著加快包含ts條件過濾的查詢,但是只對oplog查詢有效
val tailingCursor =
oplogCol
.find(Json.obj("ns" -> Json.obj("$in" -> Set(s"${db}.common-doc", s"${db}.common-article")), "ts" -> Json.obj("$gte" -> lastTS)))
.options(QueryOpts().tailable.oplogReplay.awaitData.noCursorTimeout)
.cursor[BSONDocument]()
tailingCursor.fold(()){ (_, doc) =>
try {
val jsObj = doc.as[JsObject]
jsObj("op").as[String] match {
case "i" => // 插入
case "u" => // 更新
case "d" => // 刪除
}
// 保存ts值,以備後用
if (tailCount.get() % 10 == 0) { }
} catch {
case t: Throwable =>
Logger.error("Tail oplog Error: " + t.getMessage, t)
}
}
另外提醒大家註意,ReactiveMongo-Streaming的Akka Stream實現有bug,如果首次查詢沒有數據返回,則會持續發送查詢請求,大約每秒中發送幾十次至幾百次請求,因為Oplog的查詢開銷很大,最終會導致MongoDB內存溢出。詳情參考Keep sending queries while the initial query result of a tailable cursor is empty.
參考
- MongoDB Doc - Replica Set Oplog
- MongoDB Doc - Capped Collections
- MongoDB Doc - Tailable Cursors
實時同步MongoDB Oplog開發指南