apache-kafka - 如何确保 Kafka 流正在聚合当天的数据
问题描述
我有一个流应用程序,可以计算传入交易的高/低/交易量。我有一个包含交易消息的输入主题。没有与这些消息关联的时间戳。我从输入流创建一个 KGroupedStream 并按股票代码作为键对其进行分组
然后我从 KGroupedStream 创建一个 KTable。我聚合数据并计算高/低/音量并创建一条名为 HiLowMessage 的新消息并将其流式传输到我的输出流。
由于输入主题总是有数据。如何确保仅对今天的数据进行聚合,而不包括昨天的数据?请注意,输入主题消息结构中没有时间戳。
解决方案
每个 Kafka 消息的元数据字段中都有一个时间戳(即,除了键和值之外)。此时间戳通常由将数据写入主题的上游生产者设置。默认情况下,此记录元数据时间戳在 Kafka Streams 中使用。因此,您可以使用 1-day hopping 进行窗口聚合TimeWindow
。
推荐阅读
- python-3.x - 开发用于下载和上传数据到 XNAT 的 Python Rest API
- database - 实现三种类型的用户 - Laravel
- python - TemplateDoesNotExist 用于 detail.html,但适用于 index.html(Django 教程第 3 部分)
- javascript - Discord.js PM 仅适用于 1 人
- python - Django-Q 和 request.POST 数据
- android - 带有 OnConflictStrategy.REPLACE 的 Android Room 自动时间戳
- oauth-2.0 - Azure AD 仅生成用于 Microsoft Graph 的无效访问令牌
- amazon-web-services - 用于内部 ALB 的 AWS Synthetics Canary
- azure-devops - 关闭新的 ADO 格式
- docker - 未找到 nlog.config- 文件 .Net core 3.1 和 Docker