MongoDB按照天数或小时聚合
需求
最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询.
涉及到的技术栈分别为:Spring Boot
,MongoDB,Morphia
.
数据模型
@Data @Builder @Entity(value = "rawDevStatus", noClassnameStored = true) // 设备状态索引 @Indexes({ // 设置数据超时时间(TTL,MongoDB根据TTL在后台进行数据删除操作) @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)), @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)}) }) public class RawDevStatus { @Id @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) private ObjectId objectId; private String userId; private Instant time; @Embedded("points") List<Point> protocolPoints; @Data @AllArgsConstructor public static class Point { /** * 协议类型 */ private Protocol protocol; /** * 设备总数 */ private Integer total; /** * 设备在线数目 */ private Integer onlineNum; /** * 处于启用状态设备数目 */ private Integer enableNum; } }
上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.
@Data @Builder @Entity(value = "aggregationDevStatus", noClassnameStored = true) @Indexes({ @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)), @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)}) }) public class AggregationDevStatus { @Id @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) private ObjectId objectId; /** * 用户ID */ private String userId; /** * 设备总数 */ private Double total; /** * 设备在线数目 */ private Double onlineNum; /** * 处于启用状态设备数目 */ private Double enableNum; /** * 聚合类型(按照小时还是按照天聚合) */ @Property("aggDuration") private AggregationDuration aggregationDuration; private Instant time; /** * 动态设置文档过期时间 */ private Instant expireAt; }
上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.
聚合操作符介绍
聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.
此次聚合主要涉及以下操作:
"color: #ff0000">原始聚合语句
db.getCollection('raw_dev_status').aggregate([ {$match: { time:{$gte: ISODate("2019-06-27T00:00:00Z")}, } }, {$unwind: "$points"}, {$project: { userId:1,points:1, tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } } } }, {$project: { userId:1,points:1, groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } } } }, {$group: { _id:{user_id:'$userId', cal_time:'$groupTime'}, devTotal:{'$avg':'$points.total'}, onlineTotal:{'$avg':'$points.onlineNum'}, enableTotal:{'$avg':'$points.enableNum'} } }, ])
上述代码是按小时聚合数据,以下来逐步介绍处理思路:
(1) $match
根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.
(2) $unwind
raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;
(3) $project
{$project: { userId:1,points:1, tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } } } }
选择需要输出的数据,分别为:userId,points
以及tmp.
需要注意,为了按照时间聚合,对$time属性进行操作,提取%Y:%m:%dT%H时信息至$tmp作为下一步的聚合依据.
如果需要按天聚合,则format数据可修改为:%Y:%m:%dT00:00:00Z
即可满足要求.
(4) $project
{$project: { userId:1,points:1, groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } } } }
因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
因此,此处对$tmp进行操作,转换为时间类型数据,即groupTime.
(5) $group
对聚合结果进行分类操作,并生成最终输出结果.
{$group: { # 根据_id进行分组操作,依据是`user_id`以及`$groupTime` _id:{user_id:'$userId', cal_time:'$groupTime'}, # 求设备总数平均值 devTotal:{'$avg':'$points.total'}, # 求设备在线数平均值 onlineTotal:{'$avg':'$points.onlineNum'}, # ... enableTotal:{'$avg':'$points.enableNum'} } }
代码编写
此处ODM选择Morphia,亦可以使用MongoTemplate,原理类似.
/** * 创建聚合条件 * * @param pastTime 过去时间段 * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z) * @return 聚合条件 */ private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) { Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class); return datastore.createAggregation(RawDevStatus.class) .match(query.field("time").greaterThanOrEq(pastTime)) .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false)) .match(query.field("points.protocol").equal("ALL")) .project(Projection.projection("userId"), Projection.projection("points"), Projection.projection("convertTime", Projection.expression("$dateToString", new BasicDBObject("format", dateToString) .append("date", "$time")) ) ) .project(Projection.projection("userId"), Projection.projection("points"), Projection.projection("convertTime", Projection.expression("$dateFromString", new BasicDBObject("format", stringToDate) .append("dateString", "$convertTime")) ) ) .group( Group.id(Group.grouping("userId"), Group.grouping("convertTime")), Group.grouping("total", Group.average("points.total")), Group.grouping("onlineNum", Group.average("points.onlineNum")), Group.grouping("enableNum", Group.average("points.enableNum")) ); } /** * 获取聚合结果 * * @param pipeline 聚合条件 * @return 聚合结果 */ private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) { List<AggregationMidDevStatus> statuses = new ArrayList<>(); Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate( AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build()); while (resultIterator.hasNext()) { statuses.add(resultIterator.next()); } return statuses; } //...................................................................................... // 获取聚合结果(省略若干代码) AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate); List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline); if (CollectionUtils.isEmpty(midStatuses)) { log.warn("Can not get dev status aggregation result."); return; }
总结
以上所述是小编给大家介绍的基于Morphia实现MongoDB按小时、按天聚合操作方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
- 小骆驼-《草原狼2(蓝光CD)》[原抓WAV+CUE]
- 群星《欢迎来到我身边 电影原声专辑》[320K/MP3][105.02MB]
- 群星《欢迎来到我身边 电影原声专辑》[FLAC/分轨][480.9MB]
- 雷婷《梦里蓝天HQⅡ》 2023头版限量编号低速原抓[WAV+CUE][463M]
- 群星《2024好听新歌42》AI调整音效【WAV分轨】
- 王思雨-《思念陪着鸿雁飞》WAV
- 王思雨《喜马拉雅HQ》头版限量编号[WAV+CUE]
- 李健《无时无刻》[WAV+CUE][590M]
- 陈奕迅《酝酿》[WAV分轨][502M]
- 卓依婷《化蝶》2CD[WAV+CUE][1.1G]
- 群星《吉他王(黑胶CD)》[WAV+CUE]
- 齐秦《穿乐(穿越)》[WAV+CUE]
- 发烧珍品《数位CD音响测试-动向效果(九)》【WAV+CUE】
- 邝美云《邝美云精装歌集》[DSF][1.6G]
- 吕方《爱一回伤一回》[WAV+CUE][454M]