apache-flink - 检查我是否使用所有键正确接收流
问题描述
我有以下场景:假设有 20 个传感器正在向我发送流式馈送。我对流应用 keyBy (sensorID) 并执行一些操作,例如平均等。这是实现的,并且运行良好(使用 Flink Java API)。
最初一切顺利,所有传感器都在向我发送信息。一段时间后,可能有几个传感器开始出现异常,我开始从它们那里得到不规则的信息,例如,我从 18 个传感器接收到信息,但有 2 个长时间不向我发送信息。
我们可以假设我已经知道 sensorId 的固定列表(可能是硬编码/或在数据库中)。如何确定哪两个没有发送提要?我在哪里可以获得 keyId 的列表以与数据库中的列表进行比较?
如果我没有收到提要,我想发出警报(例如,2 分钟、5 分钟、10 分钟等,优先级越来越高)。
有没有人使用 flink-streaming / 模式实现了这样的场景?请有任何建议。
解决方案
我只是碰巧有一个这种模式的例子。它需要一些调整以适合您的用例,但应该可以帮助您入门。
public class TimeoutFunction extends KeyedProcessFunction<String, Event, String> {
private ValueState<Long> lastModifiedState;
static final int TIMEOUT = 2 * 60 * 1000; // 2 minutes
@Override
public void open(Configuration parameters) throws Exception {
// register our state with the state backend
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
}
@Override
public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {
// update our state and timer
Long current = lastModifiedState.value();
if (current != null) {
ctx.timerService().deleteEventTimeTimer(current + TIMEOUT);
}
current = max(current, event.timestamp());
lastModifiedState.update(current);
ctx.timerService().registerEventTimeTimer(current + TIMEOUT);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// emit alert
String deviceId = ctx.getCurrentKey();
out.collect(deviceId);
}
}
这假设一个主程序执行如下操作:
DataStream<String> result = stream
.assignTimestampsAndWatermarks(new MyBoundedOutOfOrdernessAssigner(...))
.keyBy(e -> e.deviceId)
.process(new TimeoutFunction());
正如@Dominik 所说,这只会对至少出现一次的键发出警报。您可以通过引入事件的辅助源来解决此问题,该事件源为应该存在的每个源创建一个人工事件,并将该流与主要源合并。
推荐阅读
- r - R中的双重拆分数据集
- ms-access - Microsoft Access:手动更改 3 个关系中作为主键的文本字段的大小
- asp.net-core - 如何使用 AuthenticationSchemeOptions.Events 或 AuthenticationSchemeOptionsEventsType?
- python - 如何在函数内部打印函数的__doc__字符串而不指定函数名
- mongodb - 如何使用 Golang 准备 mongodb 查询?
- json - 函数内部动态查询的 format() 有多安全?
- javascript - 如何同时发送两条消息?
- python - 如何将输出转换为python中的列表?
- c - 赋值从指针生成整数而没有 C 中的强制转换 [-Wint-conversion]
- node.js - 如何设置跨域 JWT?