apache-kafka - 不正确的结果 Kstream-Kstream Join with asymmetric time window
问题描述
我有 2 个名为“警报”和“干预”的流,其中包含 JSON。如果连接了警报和干预,则它们将具有相同的键。我想联系他们以检测所有在 24 小时前未进行干预的警报。
但是这个程序不起作用,结果给了我所有的警报,就好像 24 小时前没有进行任何干预一样。我重新检查了我的数据集 5 次,有些警报在警报日期前不到 24 小时内完成了干预。
这张图片说明了情况:
在此处输入图片描述
所以我需要知道在警报之前是否有干预。
程序代码:
final KStream<String, JsonNode> alarm = ...;
final KStream<String, JsonNode> intervention = ...;
final JoinWindows jw = JoinWindows.of(TimeUnit.HOURS.toMillis(24)).before(TimeUnit.HOURS.toMillis(24)).after(0);
final KStream<String, JsonNode> joinedAI = alarm.filter((String key, JsonNode value) -> {
return value != null;
}).leftJoin(intervention, (JsonNode leftValue, JsonNode rightValue) -> {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = null;
if (rightValue == null) {//No intervention before
try {
actualObj = mapper.readTree("{\"date\":\"" + leftValue.get("date").asText() + "\","
+ "\"alarm\":" + leftValue.toString()
+ "}");
} catch (IOException ex) {
Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
}
return actualObj;
} else {
return null;
}
}, jw, Joined.with(Serdes.String(), jsonSerde, jsonSerde));
final KStream<String, JsonNode> fraude = joinedAI.filter((String key, JsonNode value) -> {
return value != null;
});
fraude.foreach((key, value) -> {
rl.println("Fraude=" + key + " => " + value);
System.out.println("Fraude=" + key + " => " + value);
});
final KafkaStreams streams = new KafkaStreams(builder.build(), streamingConfig);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
rl.close();
el.close();
nfl.close();
}
}));
综上所述,我想检测红色矩形中的图案在此处输入图像描述
PS:我确保在报警记录之前发送干预记录
解决方案
M.Djx,
我认为现在 Kafka Streams 中的这个用例没有完美的解决方案,但我有一些想法可以让你更接近。我正准备在不久的将来提交一个 KIP 来解决这样的用例。
一点:与 KTable 不同,KStreams 不是变更日志,因此较新的事件不会用相同的键覆盖较旧的事件;它们只是共存于同一流中。我认为这就是为什么您foreach
使所有警报看起来都没有干预的原因;您会看到干预之前的中间加入事件。
例如:
LEFT RIGHT JOIN
a:1 a:(1,null)
a:X a:(1,X)
foreach
将在两个连接结果上调用,看起来好像缺少正确的值,而实际上只是有点晚了。
如果您在结果流上应用时间窗口,您将获得一个变更日志——较新的值将覆盖较旧的值。就像是:
joinedAI
.groupByKey()
.windowedBy(
TimeWindows
.of(1000 * 60 * 60 * 24) // the window will be 24 hours in size
.until(1000 * 60 * 60 * 48) // and we'll keep it in the state store for at least 48 hours
).reduce(
new Reducer<JsonNode>() {
@Override
public Long apply(final JsonNode value1, final JsonNode value2) {
return value2;
}
},
Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("alerts-without-interventions")
);
令人遗憾的是,这将生成一个具有正确语义的变更日志流,但您仍会看到中间值,因此您也不希望直接从该流触发任何操作(例如foreach
)。
您可以做的一件事是安排一项工作,每天一次,从昨天"alerts-without-interventions"
开始扫描窗口。您从窗口存储中获得的任何结果都将是该键的最新值。
我正在准备的 KIP 将提出一种方法,让您从窗口中过滤掉中间结果,这将让您将 foreach 附加到更改日志并让它仅在窗口的最终结果上触发。
或者,如果您的应用程序的数据不是太大,并且您不太担心边缘情况,您可以考虑使用 LinkedHashMap 或 Guava 缓存自己实现“窗口最终事件”语义。
我希望这有帮助。
推荐阅读
- nosql - 跨分区查询会破坏无限的 CosmosDB 水平可伸缩性吗?
- c# - 如何为下面的节点编写 xpath?
- java - @Transient 在 Spring 数据 jpa 中的自定义查询
- reactjs - 使用库反应组件的打字稿错误:TS2769
- python-3.8 - Python3 多处理池类卡在 join() 方法中
- vba - VBA PowerPoint - 如何选择特定幻灯片或按部分选择并导出到 MP4?
- angular - 将 Observable 中的值分配给 Angular 中的接口
- c++ - 在循环中打印出值,打印出的值不超过一定数量
- node.js - sequelize 是否在数据库中创建约束?
- c++ - 私有继承和虚函数