首页 > 解决方案 > 如何使用kafka和faust检查是否在给定时间段内发送了新记录

问题描述

我正在使用包含融合平台(docker)的测试设置,并且正在处理包含以下信息的记录:传感器 ID、时间戳、值。使用 robinhood 的浮士德(类似于 Kafka Streams 但在 python 中)我正在尝试执行以下操作:

每当有传感器的新记录时,都应该有一个“计时器”,如果在给定时间内没有收到该传感器 ID 的新记录,则应该有一个错误,表明该传感器/机器可能出现故障。

我尝试过使用time.sleep(),但发生的是它只会休眠 10 秒,然后处理下一条记录。

甚至可以用我正在使用的设置做这样的事情吗?

标签: apache-kafkaapache-kafka-streamsstream-processingfaust

解决方案


您可以使用KSQL 的窗口翻滚

创建传感器信息流;

CREATE STREAM sensorinformation \
  (sensorid VARCHAR, \
   sensortimestamp BIGINT, \
   value VARCHAR) \
 WITH (KAFKA_TOPIC='sensorinformationtopic', \
       VALUE_FORMAT='DELIMITED', \
       KEY='sensorid', \
       TIMESTAMP='sensortimestamp');

最后创建一个表,其中包含在 10 秒的时间窗口内仅出现一次的故障传感器:

CREATE TABLE faulty_sensors AS \
  SELECT sensorid, \
         count(*) \
  FROM sensorinformation \
  WINDOW TUMBLING (SIZE 10 SECONDS) \
  GROUP BY sensorid \
  HAVING count(*) = 1;

推荐阅读