學習MongoDB 十二: MongoDB聚合(Aggregation Pipeline基礎篇-下)(四)
一、簡介:
上一篇我們對 db.collection.aggregate(pipeline, options)介紹,我們接下來介紹pipeline 引數和options引數的基礎認識
【pipeline 引數】
pipeline 型別是Array 語法:db.collection.aggregate( [ { <stage> }, ... ] )
$project:可以對輸入文件進行新增新欄位或刪除現有的欄位,可以自定哪些欄位顯示與不顯示。
$match :根據條件用於過濾資料,只輸出符合條件的文件,如果放在pipeline
$redact :欄位所處的document結構的級別.
$limit :用來限制MongoDB聚合管道返回的文件數
$skip :在聚合管道中跳過指定數量的文件,並返回餘下的文件。
$unwind :將文件中的某一個數組型別欄位拆分成多條,每條包含陣列中的一個值。
$sample :隨機選擇從其輸入指定數量的文件。如果是大於或等於5%的collection的文件,$sample
語法: { $sample: { size: <positive integer> } }
$sort :將輸入文件排序後輸出。
$geoNear:用於地理位置資料分析。
$out :必須為pipeline最後一個階段管道,因為是將最後計算結果寫入到指定的collection中。
$indexStats :返回資料集合的每個索引的使用情況。
語法:{ $indexStats: { } }
$group : 將集合中的文件分組,可用於統計結果,$group首先將資料根據key進行分組。
我們可以通過Aggregation pipeline一些操作 跟sql用法一樣,我們能很清晰的怎麼去使用
pipeline sql
$project select
$match where
$match(先$group階段管道 然後在$match對統計的結果進行再一次的過濾) having
$sort order by
$limit limit
二、基礎的操作
【pipeline簡單例子】
資料:
db.items.insert( [
{
"quantity" : 2,
"price" : 5.0,
"pnumber" : "p003",
},{
"quantity" : 2,
"price" : 8.0,
"pnumber" : "p002"
},{
"quantity" : 1,
"price" : 4.0,
"pnumber" : "p002"
},{
"quantity" : 2,
"price" : 4.0,
"pnumber" : "p001"
},{
"quantity" : 4,
"price" : 10.0,
"pnumber" : "p003"
},{
"quantity" : 10,
"price" : 20.0,
"pnumber" : "p001"
},{
"quantity" : 10,
"price" : 20.0,
"pnumber" : "p003"
},{
"quantity" : 5,
"price" : 10.0,
"pnumber" : "p002"
}
])
【 $project】
1、我們對上面的統計結果,只顯示count,不想_id ,可以通過$project來操作,相當SQL的select 顯示我們想要的欄位:
> db.items.aggregate([{$group:{_id:null,count:{$sum:1}}},{$project:{"_id":0,"count":1}}])
{ "count" : 8 }
【 $match】
1、我們想通過濾訂單中,想知道賣出的數量大於8的產品有哪些產品,相當於SQL:select sum(quantity) as total from items group by pnumber having total>8
> db.items.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$quantity"}}},{$match:{total:{$gt:8}}}])
{ "_id" : "p001", "total" : 12 }
{ "_id" : "p003", "total" : 16 }
2、如果是放在$group之前就是當做where來使用,我們只統計pnumber =p001 產品賣出了多少個 select sum(quantity) as total from items where pnumber='p001'
> db.items.aggregate([{$match:{"pnumber":"p001"}},{$group:{_id:null,total:{$sum:"$quantity"}}}])
{ "_id" : null, "total" : 12 }
【$skip、 $limit、$sort】
db.items.aggregate([{ $skip: 2 },{ $limit: 4 }]) 與db.items.aggregate([{ $limit: 4 },{ $skip: 2 }]) 這樣結果是不一樣的
> db.items.aggregate([{ $skip: 2 },{ $limit: 4 }])
{ "_id" : ObjectId("574d937cfafef57ee4427ac4"), "quantity" : 1, "price" : 4, "pnumber" : "p002" }
{ "_id" : ObjectId("574d937cfafef57ee4427ac5"), "quantity" : 2, "price" : 4, "pnumber" : "p001" }
{ "_id" : ObjectId("574d937cfafef57ee4427ac6"), "quantity" : 4, "price" : 10, "pnumber" : "p003" }
{ "_id" : ObjectId("574d937cfafef57ee4427ac7"), "quantity" : 10, "price" : 20, "pnumber" : "p001" }
> db.items.aggregate([{ $limit: 4 },{ $skip: 2 }])
{ "_id" : ObjectId("574d937cfafef57ee4427ac4"), "quantity" : 1, "price" : 4, "pnumber" : "p002" }
{ "_id" : ObjectId("574d937cfafef57ee4427ac5"), "quantity" : 2, "price" : 4, "pnumber" : "p001" }
$limit、$skip、$sort、$match可以使用在階段管道,如果使用在$group之前可以過濾掉一些資料,提高效能。
【$unwind】
將文件中的某一個數組型別欄位拆分成多條,每條包含陣列中的一個值。
> db.items.aggregate([{$group:{_id:"$pnumber",quantitys:{$push:"$quantity"}}}])
{ "_id" : "p001", "quantitys" : [ 2, 10 ] }
{ "_id" : "p002", "quantitys" : [ 2, 1, 5 ] }
{ "_id" : "p003", "quantitys" : [ 2, 4, 10 ] }
> db.items.aggregate([{$group:{_id:"$pnumber",quantitys:{$push:"$quantity"}}},{$unwind:"$quantitys"}])
{ "_id" : "p001", "quantitys" : 2 }
{ "_id" : "p001", "quantitys" : 10 }
{ "_id" : "p002", "quantitys" : 2 }
{ "_id" : "p002", "quantitys" : 1 }
{ "_id" : "p002", "quantitys" : 5 }
{ "_id" : "p003", "quantitys" : 2 }
{ "_id" : "p003", "quantitys" : 4 }
{ "_id" : "p003", "quantitys" : 10 }
【$out】
必須為pipeline最後一個階段管道,因為是將最後計算結果寫入到指定的collection中。
{ "_id" : "p001", "quantitys" : 2 }
{ "_id" : "p001", "quantitys" : 10 }
{ "_id" : "p002", "quantitys" : 2 }
{ "_id" : "p002", "quantitys" : 1 }
{ "_id" : "p002", "quantitys" : 5 }
{ "_id" : "p003", "quantitys" : 2 }
{ "_id" : "p003", "quantitys" : 4 }
{ "_id" : "p003", "quantitys" : 10 }
把結果放在指定的集合中result
> db.items.aggregate([{$group:{_id:"$pnumber",quantitys:{$push:"$quantity"}}},{$unwind:"$quantitys"},{$project:{"_id":0,"quantitys":1}},{$out:"result"}])
> db.result.find()
{ "_id" : ObjectId("57529143746e15e8aa207a29"), "quantitys" : 2 }
{ "_id" : ObjectId("57529143746e15e8aa207a2a"), "quantitys" : 10 }
{ "_id" : ObjectId("57529143746e15e8aa207a2b"), "quantitys" : 2 }
{ "_id" : ObjectId("57529143746e15e8aa207a2c"), "quantitys" : 1 }
{ "_id" : ObjectId("57529143746e15e8aa207a2d"), "quantitys" : 5 }
{ "_id" : ObjectId("57529143746e15e8aa207a2e"), "quantitys" : 2 }
{ "_id" : ObjectId("57529143746e15e8aa207a2f"), "quantitys" : 4 }
{ "_id" : ObjectId("57529143746e15e8aa207a30"), "quantitys" : 10 }
【$redact】
語法:{ $redact: <expression> }
$redact 跟$cond結合使用,並在$cond裡面使用了if 、then、else表示式,if-else缺一不可,$redact還有三個重要的引數:
1)$$DESCEND: 返回包含當前document級別的所有欄位,並且會繼續判欄位包含內嵌文件,內嵌文件的欄位也會去判斷是否符合條件。
2)$$PRUNE:返回不包含當前文件或者內嵌文件級別的所有欄位,不會繼續檢測此級別的其他欄位,即使這些欄位的內嵌文件持有相同的訪問級別。
3)$$KEEP:返回包含當前文件或內嵌文件級別的所有欄位,不再繼續檢測此級別的其他欄位,即使這些欄位的內嵌文件中持有不同的訪問級別。
1、 level=1則值為為$$DESCEND,否則為$$PRUNE,從頂部開始掃描下去,執行$$DESCEND包含當前document級別的所有fields。當前級別欄位的內嵌文件將會被繼續檢測。
db.redact.insert(
{
_id: 1,
level: 1,
status: "A",
acct_id: "xyz123",
cc: [{
level: 1,
type: "yy",
num: 000000000000,
exp_date: ISODate("2015-11-01T00:00:00.000Z"),
billing_addr: {
level: 5,
addr1: "123 ABC Street",
city: "Some City"
}
},{
level: 3,
type: "yy",
num: 000000000000,
exp_date: ISODate("2015-11-01T00:00:00.000Z"),
billing_addr: {
level: 1,
addr1: "123 ABC Street",
city: "Some City"
}
}]
})
db.redact.aggregate(
[
{ $match: { status: "A" } },
{
$redact: {
$cond: {
if: { $eq: [ "$level", 1] },
then: "$$DESCEND",
else: "$$PRUNE"
}
}
}
]
);
{
"_id" : 1,
"level" : 1,
"status" : "A",
"acct_id" : "xyz123",
"cc" : [
{ "level" : 1,
"type" : "yy",
"num" : 0,
"exp_date" : ISODate("2015-11-01T00:00:00Z")
}
]
}
2、$$PRUNE:不包含當前文件或者內嵌文件級別的所有欄位,不會繼續檢測此級別的其他欄位,即使這些欄位的內嵌文件持有相同的訪問級別。連等級的欄位都不顯示,也不會去掃描等級欄位包含下級。
db.redact.insert(
{
_id: 1,
level: 1,
status: "A",
acct_id: "xyz123",
cc: {
level: 3,
type: "yy",
num: 000000000000,
exp_date: ISODate("2015-11-01T00:00:00.000Z"),
billing_addr: {
level: 1,
addr1: "123 ABC Street",
city: "Some City"
}
}
}
)
db.redact.aggregate(
[
{ $match: { status: "A" } },
{
$redact: {
$cond: {
if: { $eq: [ "$level", 3] },
then: "$$PRUNE",
else: "$$DESCEND"
}
}
}
]
);
{ "_id" : 1, "level" : 1, "status" : "A", "acct_id" : "xyz123" }
3、$$KEEP:返回包含當前文件或內嵌文件級別的所有欄位,不再繼續檢測此級別的其他欄位,即使這些欄位的內嵌文件中持有不同的訪問級別。
db.redact.insert(
{
_id: 1,
level: 1,
status: "A",
acct_id: "xyz123",
cc: {
level: 2,
type: "yy",
num: 000000000000,
exp_date: ISODate("2015-11-01T00:00:00.000Z"),
billing_addr: {
level:3,
addr1: "123 ABC Street",
city: "Some City"
}
}
}
)
db.redact.aggregate(
[
{ $match: { status: "A" } },
{
$redact: {
$cond: {
if: { $eq: [ "$level", 1] },
then: "$$KEEP",
else: "$$PRUNE"
}
}
}
]
);
{ "_id" : 1, "level" : 1, "status" : "A", "acct_id" : "xyz123", "cc" : { "level" : 2, "type" : "yy", "num" : 0, "exp_date" : ISODate("2015-11-01T00:00:00Z"), "billing_addr" : { "level" : 3, "addr1" : "123 ABC Street", "city" : "Some City" } } }
【 options引數】
explain:返回指定aggregate各個階段管道的執行計劃資訊。
他操作返回一個遊標,包含aggregate執行計劃詳細資訊。
db.items.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$quantity"}}},{$group:{_id:null,max:{$max:"$total"}}}],{explain:true})
{
"stages" : [
{
"$cursor" : {
"query" : {
},
"fields" : {
"pnumber" : 1,
"quantity" : 1,
"_id" : 0
},
"plan" : {
"cursor" : "BasicCursor",
"isMultiKey" : false,
"scanAndOrder" : false,
"allPlans" : [
{
"cursor" : "BasicCursor"
,
"isMultiKey" : false,
"scanAndOrder" : false
}
]
}
}
},
{
"$group" : {
"_id" : "$pnumber",
"total" : {
"$sum" : "$quantity"
}
}
},
{
"$group" : {
"_id" : {
"$const" : null
},
"max" : {
"$max" : "$total"
}
}
}
],
"ok" : 1
}
allowDiskUse:每個階段管道限制為100MB的記憶體,如果大於100MB的資料可以先寫入臨時檔案。設定為true時,aggregate操作可時可以先將資料寫入對應資料目錄的子目錄中
的唯一併以_tmp結尾的文件中。
cursor:指定遊標的初始批批大小。游標的欄位的值是一個與場batchSize檔案。
語法cursor: { batchSize: <int> }
var cursor=db.items.aggregate([{$group:{_id:"$pnumber",total:{$sum:"$quantity"}}},{ $limit: 2 }],{cursor: { batchSize: 1 }})
版本2.6之後:DB.collect.aggregate()方法返回一個指標,可以返回任何結果集的大小。沒有指標時返回所有的結果在一個單一的集合,並將結果集限制為16位元組大小的
mongodb shell 設定遊標大小cursor.batchSize(size) 一次返回多少條,遊標提供了很多方法:
cursor.hasNext()
cursor.next()
cursor.toArray()
cursor.forEach()
cursor.map()
cursor.objsLeftInBatch()
cursor.itcount()
cursor.pretty()
bypassDocumentValidation:只有當你指定了$out操作符,使db.collection.aggregate繞過文件驗證操作過程中。這讓您插入不符合驗證要求的文件。