publish-subscribe - 用于计数 pub sub pub/sub 中的流数据的异步或同步拉取?
问题描述
我想计算过去一小时内的消息数量(最后一小时指的是消息数据中的时间戳字段)。
我目前有一个代码可以同步计算消息(我正在使用 Google Cloud Pub/Sub Synchronous pull),但我注意到这需要很长时间。
我的代码将按预定义的次数(我将其设置为 100 次以上)重复轮询订阅,以便我确信在最后一小时内没有更多消息出现乱序。
这不是一个可接受的设计,因为这意味着用户必须等待 5 到 10 分钟才能让服务在他们想要指标时对消息进行计数!
Pub Sub设计中是否有解决此类问题的最佳实践?
这似乎是一个要解决的简单问题(计算上一个 X 时间范围内的事件数),所以我认为可能存在。
异步设计会有帮助吗?异步设计如何工作?我不太确定异步和 Pythonfuture
概念(我正在使用 GCP Pub/Sub 的 Python 客户端库)。
解决方案
我会尝试以不同的方式捕捉信息。我的解决方案基于日志记录和 BigQuery。这个想法是写一个日志,例如message received with timestamp xxxxx
,过滤这个日志模式并将结果接收到 BigQuery 中。
然后,当用户询问时,您只需请求 BigQuery 并在所需的时间段内计算消息。您还可以更改时间框架,拥有历史,...
为了写这个日志,2个解决方案
- 更便宜但不是真正推荐的,消耗消息的进程将其与它一起处理。但是,您依赖于外部服务。这个服务有两个职责:它的工作和这个日志(用于度量)。不是固体。也许它可以是发布者的角色,带有这样的日志:
message published at XXXX
. 然而,这意味着所有发布者或所有订阅者都在 GCP 上。 - 更好的是插入一个函数,更便宜(128Mb 的内存)来简单地处理消息和写入日志。
推荐阅读
- google-apps-script - 通过 Google 表格中的单元格内容控制文本大小
- javascript - 2个javascript语句的范围问题
- python - Pandas groupby、bin 和 average
- pyspark - 将字符串(带时间戳)转换为 pyspark 中的时间戳
- python - 如何获取 set_index 第一个值
- flutter - 我想添加一个带有颤动的页面收藏夹按钮,即使在本地上下文中也可以保存
- flutter - ChangeNotifier 包装类围绕模型的每个实例
- mongodb - MongoDB:嵌套 $group 和 $push 但不是获取数组而是获取文档
- mongoose - 如何搜索地图类型
- apache-spark - 如何在 PySpark 应用程序中设置纱线应用程序 ID