1. 程式人生 > >實時同步MongoDB Oplog開發指南

實時同步MongoDB Oplog開發指南

過濾 opts -o 速度 初始 .html https get() 系統

轉載請註明joymufeng,歡迎訪問PlayScala社區(http://www.playscala.cn/)

Capped Collections

MongoDB有一種特殊的Collection叫Capped collections,它的插入速度非常快,基本和磁盤的寫入速度差不多,並且支持按照插入順序高效的查詢操作。Capped collections的大小是固定的,它的工作方式很像環形緩沖器(circular buffers), 當剩余空間不足時,會覆蓋最先插入的數據。

Capped collections的特點是高效插入和檢索,所以最好不要在Capped collections上添加額外的索引,否則會影響插入速度。Capped collections可以用於以下場景:

  • 存儲日誌: Capped collections的first-in-first-out特性剛好滿足日誌事件的存儲順序;
  • 緩存小量數據:因為緩存的特點是讀多寫少,所以可以適當使用索引提高讀取速度。

Capped collections的使用限制:

  • 如果更新數據,你需要為之創建索引以防止collection scan;
  • 更新數據時,文檔的大小不能改變。比如說name屬性為‘abc‘,則只能修改成3個字符的字符串,否則操作將會失敗;
  • 數據不允許刪除,如果非刪除不可,只能drop collection
  • 不支持sharding
  • 默認只支持按自然順序(即插入順序)返回結果

Capped collections可以使用$natural操作符按插入順序的正序或反序返回結果:

db[‘oplog.rs‘].find({}).sort({$natural: -1})

Oplog

Oplog是一種特殊的Capped collections,特殊之處在於它是系統級Collection,記錄了數據庫的所有操作,集群之間依靠Oplog進行數據同步。Oplog的全名是local.oplog.rs,位於local數據下。由於local數據不允許創建用戶,如果要訪問Oplog需要借助其它數據庫的用戶,並且賦予該用戶訪問local數據庫的權限,例如:

db.createUser({
   user: "play-community",
   pwd: "******",
   "roles" : [
    {
      "role" : "readWrite", 
      "db" : "play-community"
    }, 
    {
      "role" : "read", 
      "db" : "local"
    }
  ]
})

Oplog記錄的操作記錄是冪等的(idempotent),這意味著你可以多次執行這些操作而不會導致數據丟失或不一致。例如對於$inc操作,Oplog會自動將其轉換為$set操作,例如原始數據如下:

{ 
  "_id" : "0", 
  "count" : 1.0
}

執行如下$inc操作:

db.test.update({_id: "0"}, {$inc: {count: 1}})

Oplog記錄的日誌為:

{ 
  "ts" : Timestamp(1503110518, 1), 
  "t" : NumberLong(8), 
  "h" : NumberLong(-3967772133090765679), 
  "v" : NumberInt(2), 
  "op" : "u", 
  "ns" : "play-community.test", 
  "o2" : {
    "_id" : "0"
  }, 
  "o" : {
    "$set" : {
      "count" : 2.0
    }
  }
}

這種轉換可以保證Oplog的冪等性。另外Oplog為了保證插入性能,不允許額外創建索引。

Timestamps格式

MongoDB有一種特殊的時間格式Timestamps,僅用於內部使用,例如上面Oplog記錄:

Timestamp(1503110518, 1)

Timestamps長度為64位:

  • 前32位是time_t值,表示從epoch時間至今的秒數
  • 後32位是ordinal值,該值是一個順序增長的序數,表示某一秒內的第幾次操作

開始同步Oplog

在開始同步Oplog之前,我們需要註意以下幾點:

  • 由於Oplog不使用索引,所以初始查詢代價可能很大
  • 當Oplog數據量很大時,可以保存ts,系統重啟時利用該ts可以減少首次查詢開銷
  • oplogReplay標誌可以顯著加快包含ts條件過濾的查詢,但是只對oplog查詢有效
val tailingCursor =
 oplogCol
  .find(Json.obj("ns" -> Json.obj("$in" -> Set(s"${db}.common-doc", s"${db}.common-article")), "ts" -> Json.obj("$gte" -> lastTS)))
  .options(QueryOpts().tailable.oplogReplay.awaitData.noCursorTimeout)
  .cursor[BSONDocument]()

tailingCursor.fold(()){ (_, doc) =>
 try {
  val jsObj = doc.as[JsObject]
  jsObj("op").as[String] match {
   case "i" => // 插入
   case "u" => // 更新
   case "d" => // 刪除
  }

  // 保存ts值,以備後用
  if (tailCount.get() % 10 == 0) { }
 } catch {
  case t: Throwable =>
   Logger.error("Tail oplog Error: " + t.getMessage, t)
 }
}

另外提醒大家註意,ReactiveMongo-Streaming的Akka Stream實現有bug,如果首次查詢沒有數據返回,則會持續發送查詢請求,大約每秒中發送幾十次至幾百次請求,因為Oplog的查詢開銷很大,最終會導致MongoDB內存溢出。詳情參考Keep sending queries while the initial query result of a tailable cursor is empty.

參考

  • MongoDB Doc - Replica Set Oplog
  • MongoDB Doc - Capped Collections
  • MongoDB Doc - Tailable Cursors

實時同步MongoDB Oplog開發指南