MongoDB分片儲存的叢集架構實現
如果需要儲存大量資料,或者系統的讀寫吞吐量很大的時候,單個server就很難滿足需求了。這個時候我們可以使用MongoDB的分片機制來解決這些問題。
分片的基本概念
分片(sharding)是一種水平擴充套件(horizontal scaling)的方式,把一個大的資料集分散到多個片伺服器上,所有的片伺服器將組成一個邏輯上的資料庫來儲存這個大的資料集。分片對使用者(應用層)是透明的,使用者不會知道資料很被存放到哪個片伺服器上。
這種方式有兩個好處:
- 分片之後,每個片伺服器處理的請求數量將遠遠小於不分片的情況,這樣就可以水平擴充套件整個分片叢集的儲存量和吞吐量
- 分片之後每個片伺服器需要儲存、維護的資料量將大大減少
分片以及分片叢集
下面的圖來自MongoDB文件,圖片大概展示了分片的概念。1TB的資料被分配到四個片伺服器上,四個片伺服器組成了邏輯Collection I。
在MongoDB中,我們可以搭建分片叢集(如下圖)來實現資料分片的功能。分片叢集中有三個基本部分組成:
- Mongos(路由伺服器):是MongoDB中的路由程序,它將路由所有來自客戶端的請求,分發到每個分片server;然後它將聚合各個分片server的結果,返回個客戶端
- config server(配置伺服器):該伺服器儲存了叢集的配置資訊:資料和片的對應關係。路由器將根據資料和分片的mapping,把資料操作分配到特定的分片上
- shard server(片伺服器):用來儲存子集合的資料容器。片伺服器可以是單個的mongod伺服器,也可以是一個副本集
片鍵
當設定分片時,需要從集合裡面選擇一個或幾個鍵,把選擇出來的鍵作為資料拆分的依據,這個鍵叫做片鍵或複合片鍵。
選好片鍵後,MongoDB將不允許插入沒有片鍵的文件,但是允許不同文件的片鍵型別不一樣。
塊(chunk)
在一個shard server內部,MongoDB還是會把資料分為chunks,每個chunk代表這個shard server內部一部分資料。chunk的產生,會有以下兩個用途:
-
Splitting:
- 當一個chunk的大小超過配置中的chunk size時,MongDB的後臺程序會把這個chunk切分成更小的chunk,從而避免chunk過大的情況
-
Balancing:
- 在MongoDB中,balancer是一個後臺程序,負責chunk的遷移,從而均衡各個shard server的負載
分片叢集的搭建
搭建
下面我們開始單鍵一個分片叢集,來通過一些實踐體驗一下分片叢集。
-
啟動分片中需要的MongDB例項,在這裡為了簡單,片伺服器使用單個MongoDB例項,而不是副本集
啟動配置伺服器
mongod.exe --dbpath="C:\mongodb\db\config" --port 11110
啟動路由伺服器
mongos.exe --port=11111 --configdb=127.0.0.1:11110
啟動片伺服器
mongod.exe --dbpath="c:\mongodb\db\shard0" --port=22220 mongod.exe --dbpath="c:\mongodb\db\shard1" --port=22221
-
新增片伺服器
通過MongoDB shell連線到mongos例項,並且通過以下命令新增
mongo.exe 127.0.0.1:11111 use admin db.runCommand({"addshard":"127.0.0.1:22220",allowLocal:true}) db.runCommand({"addshard":"127.0.0.1:22221",allowLocal:true})
-
啟動分片,並且設定片鍵
mongos> use admin switched to db admin mongos> db.runCommand({"enablesharding":"test"}) { "ok" : 1 } mongos> db.runCommand({"shardcollection":"test.student","key":{"sid":1}}) { "collectionsharded" : "test.student", "ok" : 1 } mongos>
檢視分片叢集狀態
通過下面的命令可以檢視分片叢集的狀態,可以得到以下資訊:
- 分片叢集中所有的片伺服器
- 分片叢集中所有的資料庫,以及哪些資料庫已經啟動分片
- 已分片資料庫的主片和片鍵
mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0000", "host" : "127.0.0.1:22220" }, { "_id" : "shard0001", "host" : "127.0.0.1:22221" } ], "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0000", "host" : "127.0.0.1:22220" } { "_id" : "shard0001", "host" : "127.0.0.1:22221" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0000" } test.student shard key: { "sid" : 1 } chunks: shard0000 1 { "name" : { "$minKey" : 1 } } -->> { " sid " : { "$maxKey" : 1 } } on : shard0000 Timestamp(1, 0) mongos>
刪除分片
在上面的輸出中,可以看到以下輸出語句。在所有的片伺服器中,都會有一個主片,這個片的刪除需要特殊的命令。同時,刪除分片可能需要一定的時間,因為MongoDB會把要刪除分片上的資料移動到其他的分片上。
{ "_id" : "test", "partitioned" : true, "primary" : "shard0000" }
首先,從簡單的開始,先刪除非主片shard0001,從輸出可以看到,shard0001處於draining狀態,表示資料正在轉移,當需要遷移的資料量很大的時候,可能就需要等待一些時間
mongos> db.runCommand({"removeshard":"127.0.0.1:22221"}) { "msg" : "draining started successfully", "state" : "started", "shard" : "shard0001", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0000", "host" : "127.0.0.1:22220" }, { "_id" : "shard0001", "draining" : true, "host" : "127.0.0.1:22221" } ], "ok" : 1 } mongos>
再次使用刪除操作,可以看到shard0001已經被刪除
mongos> db.runCommand({"removeshard":"127.0.0.1:22221"}) { "msg" : "removeshard completed successfully", "state" : "completed", "shard" : "shard0001", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0000", "host" : "127.0.0.1:22220" } ], "ok" : 1 } mongos>
下面重新新增剛才刪除的分片,這次嘗試刪除主分片,會遇到以下錯誤
mongos> db.runCommand({"removeshard":"127.0.0.1:22220"}) { "msg" : "draining started successfully", "state" : "started", "shard" : "shard0000", "note" : "you need to drop or movePrimary these databases", "dbsToMove" : [ "test" ], "ok" : 1 } mongos>
這時,使用moveprimary命令把shard0001設定為主分片
mongos> db.runCommand({"moveprimary":"test","to":"127.0.0.1:22221"}) { "primary " : "shard0001:127.0.0.1:22221", "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0000", "draining" : true, "host" : "127.0.0.1:22220" } { "_id" : "shard0001", "host" : "127.0.0.1:22221" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0001" } test.student shard key: { " sid " : 1 } chunks: shard0001 1 { "name" : { "$minKey" : 1 } } -->> { " sid " : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 0) mongos>
然後就可以直接刪除shard0000了
mongos> db.runCommand({"removeshard":"127.0.0.1:22220"}) { "msg" : "removeshard completed successfully", "state" : "completed", "shard" : "shard0000", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0001", "host" : "127.0.0.1:22221" } ], "ok" : 1 } mongos>
最後重新新增剛才刪除的分片,將遇到以下錯誤,提示test資料庫已經存在
mongos> db.runCommand({"addshard":"127.0.0.1:22220",allowLocal:true}) { "ok" : 0, "errmsg" : "can't add shard 127.0.0.1:22220 because a local database 'test' exists in another shard0001:127.0.0.1:22221" } mongos>
這時,就需要通過MongoDB shell連線到這個例項,刪除test資料庫。然後重新新增。
C:\mongodb\bin>mongo.exe --port=22220 MongoDB shell version: 2.4.6 connecting to: 127.0.0.1:22220/test > use test switched to db test > db.dropDatabase() { "dropped" : "test", "ok" : 1 } >
mongos> db.runCommand({"addshard":"127.0.0.1:22220",allowLocal:true}) { "shardAdded" : "shard0002", "ok" : 1 } mongos> db.runCommand({"listShards":1}) { "shards" : [ { "_id" : "shard0001", "host" : "127.0.0.1:22221" }, { "_id" : "shard0002", "host" : "127.0.0.1:22220" } ], "ok" : 1 } mongos>
在這個部分中可以看到,通過上面三個步驟,一個簡單的分片叢集就建立起來了。下面我們開始使用這個分片叢集。
分片叢集的管理
首先切換到config資料庫,看看都有哪些表
mongos> use config switched to db config mongos> show collections changelog chunks collections databases lockpings locks mongos settings shards system.indexes tags version mongos>
然後看看下面幾張表中的資料
- chunks表:通過這張表可以看到分片叢集中的資料塊
- databases表:表裡顯示了所有在分片叢集中儲存的資料庫,同時可以看到那些資料庫是啟動分片的,並且指示出主分片
- settings表:這張表包含分片叢集的配置項,其中塊大小(chunk size)就是通過這個檔案設定
- shards表:這張表包含了分片叢集中的所有片
mongos> db.chunks.find() { "_id" : "test.student-sid_MinKey", "lastmod" : Timestamp(1, 0), "lastmodEpoch" : ObjectId("548c02d79355a9edbf5f0193"), "ns" : "test.studen t", "min" : { "sid" : { "$minKey" : 1 } }, "max" : { "sid" : { "$maxKey" : 1 } }, "shard" : "shard0002" } mongos> db.databases.find() { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } mongos> db.settings.find() { "_id" : "chunksize", "value" : 1 } mongos> db.shards.find() { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } mongos>
預設的chunk size是64MB,為了方便看到下面平衡(Balancing)的過程,我們重設chunk size為1MB
mongos> use config switched to db config mongos> db.settings.save({ _id:"chunksize", value: 1}) mongos> db.settings.find() { "_id" : "chunksize", "value" : 1 } mongos>
分片叢集的使用
基本測試
基於前面建立的分片叢集,我們進行一些測試,首先插入10W條文件,然後查詢分片叢集的狀態。從下面的輸出中可以看到,10W條文件一共使用了10個chunk,2個chunk在shard0002上,8個chunk在shard0001上。
mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0002 2 shard0001 8 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0002 Timestamp(2, 1) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(1, 3) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0001 Timestamp(2, 2) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0001 Timestamp(2, 4) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0001 Timestamp(2, 6) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0001 Timestamp(2, 8) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0001 Timestamp(2, 10) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(2, 12) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } mongos>
經過一段時間後,再次查詢分片叢集的狀態,可以看到,經過Balancing過程,所有的chunk基本要均勻的分佈到了兩個片伺服器中。
MongoDB這種平衡的能力能夠對各個片伺服器進行負載均衡,但是如果資料特別大話則可能會非常的慢,因為資料遷移了會很大。
mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0002 5 shard0001 5 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0002 Timestamp(2, 1) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(1, 3) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0002 Timestamp(3, 0) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0002 Timestamp(4, 0) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0002 Timestamp(5, 0) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0001 Timestamp(5, 1) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0001 Timestamp(2, 10) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(2, 12) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } mongos>
當然,也可以通過下面命令檢視某個collection的資料在分片中的分佈情況
mongos> db.student.getShardDistribution() Shard shard0001 at 127.0.0.1:22221 data : 4.28MiB docs : 51087 chunks : 5 estimated data per chunk : 878KiB estimated docs per chunk : 10217 Shard shard0002 at 127.0.0.1:22220 data : 4.1MiB docs : 48913 chunks : 5 estimated data per chunk : 840KiB estimated docs per chunk : 9782 Totals data : 8.39MiB docs : 100000 chunks : 10 Shard shard0001 contains 51.08% data, 51.08% docs in cluster, avg obj size on shard : 88B Shard shard0002 contains 48.91% data, 48.91% docs in cluster, avg obj size on shard : 88B mongos>
對現有資料庫分片
現在假設我們有一個”127.0.0.1:22222″的MongDB例項,我們想對上面已有的資料進行分片。
首先插入測試資料
use school for(var i=0;i<100000;i++){ var randAge = parseInt(5*Math.random()) + 20; var gender = (randAge%2)?"Male":"Female"; db.student.insert({"sid":i, "name":"Will"+i, "gender": gender, "age": randAge}); }
然後,在現有的分片叢集中加入”127.0.0.1:22222″作為一個片伺服器,從下面的輸出可以看到:
- 分片叢集已經加入片伺服器”127.0.0.1:22222″
- 分片叢集中原有的需要分片的資料庫(test)的資料被Balancing到新的片伺服器上
- 新加入的片伺服器上的資料庫(school)沒有開啟分片
mongos> use admin switched to db admin mongos> db.runCommand({"addshard":"127.0.0.1:22222",allowLocal:true}) { "shardAdded" : "shard0003", "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } { "_id" : "shard0003", "host" : "127.0.0.1:22222" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0003 3 shard0002 4 shard0001 3 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0003 Timestamp(7, 0) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(7, 1) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0002 Timestamp(3, 0) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0002 Timestamp(4, 0) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0002 Timestamp(5, 0) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0003 Timestamp(6, 0) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0003 Timestamp(8, 0) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(8, 1) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } { "_id" : "school", "partitioned" : false, "primary" : "shard0003" } mongos>
當我們使用以下命令開啟school的分片時,會得到一個錯誤,提示我們sid沒有索引。
根據MongDB文件,所有分片的集合在片鍵上都必須建索引,這是MongoDB自動執行的。所以,對已有的資料庫加入分片叢集中的時候,可能就需要手動建立索引了。
根據這一點我們也可以得到,片鍵最好是查詢中經常用到的欄位,因為如果選擇某個欄位作為片鍵但是基本不在這個欄位做查詢那麼等於浪費了一個索引,而增加一個索引總是會使得插入操作變慢。
mongos> db.runCommand({"enablesharding":"school"}) { "ok" : 1 } mongos> db.runCommand({"shardcollection":"school.student","key":{"sid":1}}) { "proposedKey" : { "sid" : 1 }, "curIndexes" : [ { "v" : 1, "key" : { "_id" : 1 }, "ns" : "school.student", "name" : "_id_" } ], "ok" : 0, "errmsg" : "please create an index that starts with the shard key before sharding." }
回到剛才的錯誤,我們現在手動建立一個sid的索引,然後重新設定片鍵,這次成功了,並且school中的資料已經被Balancing到各個片伺服器上。
mongos> use school switched to db school mongos> db.student.getIndexes() [ { "v" : 1, "key" : { "_id" : 1 }, "ns" : "school.student", "name" : "_id_" } ] mongos> db.student.ensureIndex({"sid":1}) mongos> use admin switched to db admin mongos> db.runCommand({"shardcollection":"school.student","key":{"sid":1}}) { "collectionsharded" : "school.student", "ok" : 1 } mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "version" : 3, "minCompatibleVersion" : 3, "currentVersion" : 4, "clusterId" : ObjectId("548bd82b9355a9edbf5efa69") } shards: { "_id" : "shard0001", "host" : "127.0.0.1:22221" } { "_id" : "shard0002", "host" : "127.0.0.1:22220" } { "_id" : "shard0003", "host" : "127.0.0.1:22222" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard0002" } test.student shard key: { "sid" : 1 } chunks: shard0003 3 shard0002 4 shard0001 3 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 0 } on : shard0003 Timestamp(7, 0) { "sid" : 0 } -->> { "sid" : 6673 } on : shard0002 Timestamp(7, 1) { "sid" : 6673 } -->> { "sid" : 20765 } on : shard0002 Timestamp(3, 0) { "sid" : 20765 } -->> { "sid" : 34839 } on : shard0002 Timestamp(4, 0) { "sid" : 34839 } -->> { "sid" : 48913 } on : shard0002 Timestamp(5, 0) { "sid" : 48913 } -->> { "sid" : 63011 } on : shard0003 Timestamp(6, 0) { "sid" : 63011 } -->> { "sid" : 74942 } on : shard0003 Timestamp(8, 0) { "sid" : 74942 } -->> { "sid" : 86877 } on : shard0001 Timestamp(8, 1) { "sid" : 86877 } -->> { "sid" : 98836 } on : shard0001 Timestamp(2, 14) { "sid" : 98836 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0001 Timestamp(2, 15) { "_id" : "amdin", "partitioned" : false, "primary" : "shard0002" } { "_id" : "school", "partitioned" : true, "primary" : "shard0003" } school.student shard key: { "sid" : 1 } chunks: shard0001 6 shard0002 5 shard0003 6 { "sid" : { "$minKey" : 1 } } -->> { "sid" : 5957 } on : shard0001 Timestamp(2, 0) { "sid" : 5957 } -->> { "sid" : 11915 } on : shard0002 Timestamp(3, 0) { "sid" : 11915 } -->> { "sid" : 17873 } on : shard0001 Timestamp(4, 0) { "sid" : 17873 } -->> { "sid" : 23831 } on : shard0002 Timestamp(5, 0) { "sid" : 23831 } -->> { "sid" : 29789 } on : shard0001 Timestamp(6, 0) { "sid" : 29789 } -->> { "sid" : 35747 } on : shard0002 Timestamp(7, 0) { "sid" : 35747 } -->> { "sid" : 41705 } on : shard0001 Timestamp(8, 0) { "sid" : 41705 } -->> { "sid" : 47663 } on : shard0002 Timestamp(9, 0) { "sid" : 47663 } -->> { "sid" : 53621 } on : shard0001 Timestamp(10, 0) { "sid" : 53621 } -->> { "sid" : 59579 } on : shard0002 Timestamp(11, 0) { "sid" : 59579 } -->> { "sid" : 65537 } on : shard0001 Timestamp(12, 0) { "sid" : 65537 } -->> { "sid" : 71495 } on : shard0003 Timestamp(12, 1) { "sid" : 71495 } -->> { "sid" : 77453 } on : shard0003 Timestamp(1, 12) { "sid" : 77453 } -->> { "sid" : 83411 } on : shard0003 Timestamp(1, 13) { "sid" : 83411 } -->> { "sid" : 89369 } on : shard0003 Timestamp(1, 14) { "sid" : 89369 } -->> { "sid" : 95327 } on : shard0003 Timestamp(1, 15) { "sid" : 95327 } -->> { "sid" : { "$maxKey" : 1 } } on : shard0003 Timestamp(1, 16) mongos>
總結
通過本篇文章瞭解了分片的基本概念,同時搭建了一個簡單的分片叢集,進行了一些基本測試。
對於什麼時候要使用分片這個問題,可以主要有下面幾個參考點:
- 資料量很大單個儲存伺服器無法滿需需求
- 系統中活動資料量的大小很快將要超過系統RAM的最大容量
- 系統有大量的讀寫請求,單個單個儲存伺服器不足以支援