基於Morphia實現MongoDB按小時、按天聚合操作方法
MongoDB按照天數或小時聚合
需求
最近接到需求,需要對使用者賬戶下的裝置狀態,分別按照天以及小時進行聚合,以此為基礎繪製裝置狀態趨勢圖.
實現思路是啟動定時任務,對各使用者的裝置狀態資料分別按照小時以及天進行聚合,並存儲進資料庫中供使用者後續查詢.
涉及到的技術棧分別為:Spring Boot
,MongoDB,Morphia
.
資料模型
@Data @Builder @Entity(value = "rawDevStatus",noClassnameStored = true) // 裝置狀態索引 @Indexes({ // 設定資料超時時間(TTL,MongoDB根據TTL在後臺進行資料刪除操作) @Index(fields = @Field("time"),options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),@Index(fields = {@Field("userId"),@Field(value = "time",type = IndexType.DESC)}) }) public class RawDevStatus { @Id @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) private ObjectId objectId; private String userId; private Instant time; @Embedded("points") List<Point> protocolPoints; @Data @AllArgsConstructor public static class Point { /** * 協議型別 */ private Protocol protocol; /** * 裝置總數 */ private Integer total; /** * 裝置線上數目 */ private Integer onlineNum; /** * 處於啟用狀態裝置數目 */ private Integer enableNum; } }
上述程式碼是裝置狀態實體類,其中裝置狀態資料是按照裝置所屬協議進行區分的.
@Data @Builder @Entity(value = "aggregationDevStatus",noClassnameStored = true) @Indexes({ @Index(fields = @Field("expireAt"),options = @IndexOptions(expireAfterSeconds = 0)),type = IndexType.DESC)}) }) public class AggregationDevStatus { @Id @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) private ObjectId objectId; /** * 使用者ID */ private String userId; /** * 裝置總數 */ private Double total; /** * 裝置線上數目 */ private Double onlineNum; /** * 處於啟用狀態裝置數目 */ private Double enableNum; /** * 聚合型別(按照小時還是按照天聚合) */ @Property("aggDuration") private AggregationDuration aggregationDuration; private Instant time; /** * 動態設定文件過期時間 */ private Instant expireAt; }
上述程式碼是期待的聚合結果,其中構建兩個索引:(1)超時索引;(2)複合索引,程式會根據使用者名稱以及時間查詢裝置狀態聚合結果.
聚合操作符介紹
聚合操作類似於管道,管道中的每一步操作產生的中間結果作為下一步的輸入源,最終輸出聚合結果.
此次聚合主要涉及以下操作:
•$project:指定輸出文件中的欄位.
•$unwind:拆分資料中的陣列;
•match:選擇要處理的文件資料;
•group:根據key分組聚合結果.
原始聚合語句
db.getCollection('raw_dev_status').aggregate([ {$match: { time:{$gte: ISODate("2019-06-27T00:00:00Z")},} },{$unwind: "$points"},{$project: { userId:1,points:1,tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z",date: "$time" } } } },groupTime: {$dateFromString: { dateString: "$tmp",format: "%Y:%m:%dT%H:%M:%SZ",} } } },{$group: { _id:{user_id:'$userId',cal_time:'$groupTime'},devTotal:{'$avg':'$points.total'},onlineTotal:{'$avg':'$points.onlineNum'},enableTotal:{'$avg':'$points.enableNum'} } },])
上述程式碼是按小時聚合資料,以下來逐步介紹處理思路:
(1) $match
根據小時聚合資料,因為只需要獲取近24小時的聚合結果,所以對資料進行初步篩選.
(2) $unwind
raw_dev_status中的裝置狀態是按照協議區分的陣列,因此需要對其進行展開,以便下一步進行篩選;
(3) $project
{$project: { userId:1,date: "$time" } } } }
選擇需要輸出的資料,分別為:userId,points
以及tmp.
需要注意,為了按照時間聚合,對$time屬性進行操作,提取%Y:%m:%dT%H時資訊至$tmp作為下一步的聚合依據.
如果需要按天聚合,則format資料可修改為:%Y:%m:%dT00:00:00Z
即可滿足要求.
(4) $project
{$project: { userId:1,} } } }
因為上一步project操作中,tmp為字串資料,最終的聚合結果需要時間戳(主要懶,不想在程式中進行轉換操作).
因此,此處對$tmp進行操作,轉換為時間型別資料,即groupTime.
(5) $group
對聚合結果進行分類操作,並生成最終輸出結果.
{$group: { # 根據_id進行分組操作,依據是`user_id`以及`$groupTime` _id:{user_id:'$userId',# 求裝置總數平均值 devTotal:{'$avg':'$points.total'},# 求裝置線上數平均值 onlineTotal:{'$avg':'$points.onlineNum'},# ... enableTotal:{'$avg':'$points.enableNum'} } }
程式碼編寫
此處ODM選擇Morphia,亦可以使用MongoTemplate,原理類似.
/** * 建立聚合條件 * * @param pastTime 過去時間段 * @param dateToString 格式化字串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z) * @return 聚合條件 */ private AggregationPipeline createAggregationPipeline(Instant pastTime,String dateToString,String stringToDate) { Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class); return datastore.createAggregation(RawDevStatus.class) .match(query.field("time").greaterThanOrEq(pastTime)) .unwind("points",new UnwindOptions().preserveNullAndEmptyArrays(false)) .match(query.field("points.protocol").equal("ALL")) .project(Projection.projection("userId"),Projection.projection("points"),Projection.projection("convertTime",Projection.expression("$dateToString",new BasicDBObject("format",dateToString) .append("date","$time")) ) ) .project(Projection.projection("userId"),Projection.expression("$dateFromString",stringToDate) .append("dateString","$convertTime")) ) ) .group( Group.id(Group.grouping("userId"),Group.grouping("convertTime")),Group.grouping("total",Group.average("points.total")),Group.grouping("onlineNum",Group.average("points.onlineNum")),Group.grouping("enableNum",Group.average("points.enableNum")) ); } /** * 獲取聚合結果 * * @param pipeline 聚合條件 * @return 聚合結果 */ private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) { List<AggregationMidDevStatus> statuses = new ArrayList<>(); Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate( AggregationMidDevStatus.class,AggregationOptions.builder().allowDiskUse(true).build()); while (resultIterator.hasNext()) { statuses.add(resultIterator.next()); } return statuses; } //...................................................................................... // 獲取聚合結果(省略若干程式碼) AggregationPipeline pipeline = createAggregationPipeline(pastTime,dateToString,stringToDate); List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline); if (CollectionUtils.isEmpty(midStatuses)) { log.warn("Can not get dev status aggregation result."); return; }
總結
以上所述是小編給大家介紹的基於Morphia實現MongoDB按小時、按天聚合操作方法,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回覆大家的。在此也非常感謝大家對我們網站的支援!
如果你覺得本文對你有幫助,歡迎轉載,煩請註明出處,謝謝!