apache-kafka - 如何使用kafka和faust检查是否在给定时间段内发送了新记录
问题描述
我正在使用包含融合平台(docker)的测试设置,并且正在处理包含以下信息的记录:传感器 ID、时间戳、值。使用 robinhood 的浮士德(类似于 Kafka Streams 但在 python 中)我正在尝试执行以下操作:
每当有传感器的新记录时,都应该有一个“计时器”,如果在给定时间内没有收到该传感器 ID 的新记录,则应该有一个错误,表明该传感器/机器可能出现故障。
我尝试过使用time.sleep()
,但发生的是它只会休眠 10 秒,然后处理下一条记录。
甚至可以用我正在使用的设置做这样的事情吗?
解决方案
您可以使用KSQL 的窗口翻滚:
创建传感器信息流;
CREATE STREAM sensorinformation \
(sensorid VARCHAR, \
sensortimestamp BIGINT, \
value VARCHAR) \
WITH (KAFKA_TOPIC='sensorinformationtopic', \
VALUE_FORMAT='DELIMITED', \
KEY='sensorid', \
TIMESTAMP='sensortimestamp');
最后创建一个表,其中包含在 10 秒的时间窗口内仅出现一次的故障传感器:
CREATE TABLE faulty_sensors AS \
SELECT sensorid, \
count(*) \
FROM sensorinformation \
WINDOW TUMBLING (SIZE 10 SECONDS) \
GROUP BY sensorid \
HAVING count(*) = 1;
推荐阅读
- python - 如何获取 matplotlib 中所有标记的列表?
- python - 使用机械汤输入后显示为输入选项 - Python
- azure-active-directory - 将所有者添加到 Graph 中的组时未收到电子邮件
- django - 修复错误:django.db.utils.OperationalError: no such table: auth_group
- docker - 无法在 Mac OS 主机上的两个 Docker 容器之间发送/接收 UDP 通信
- c# - 为什么 C# 表单应用程序中的 Firebase 不是实时的?
- java - 实现与firebase一起使用的倒数计时器
- batch-file - 批处理文件到当前目录的相对路径
- android - Android Studio(未安装),当在机器上安装 Android Studio 时运行颤振医生
- android - 覆盖、调整大小和重新定位图像