java - Flink 将所有流元素保存在 HashMap 中
问题描述
我正在尝试在 Flink 作业的窗口函数中使用 HashMap。是否可以将所有并行运算符的所有元素存储在一个运算符的 HashMap 中?
public class SeewoUserWindowFunction implements WindowFunction<ObjectNode, LabelInfo, String, TimeWindow> {
private static final Logger logger = LoggerFactory.getLogger(SeewoUserWindowFunction.class);
@Override
public void apply(String s, TimeWindow timeWindow, Iterable<ObjectNode> iterable, Collector<LabelInfo> collector) throws Exception {
try {
HashMap<String, LabelInfo> result = new HashMap<>();
iterable.forEach(e -> {
String key = e.get("value").get("$tid").toString() + "/" + e.get("value").get("$code").toString();
if (result.containsKey(key)) {
result.put(key, result.get(key).update(e, timeWindow.getEnd()));
} else {
result.put(key, LabelInfo.of(e, timeWindow.getEnd()));
}
});
result.values().stream().forEach(labelInfo -> collector.collect(labelInfo));
} catch (Exception exception) {
logger.error("parse exception!", exception);
}
}
}
解决方案
您可以使用org.apache.flink.streaming.api.datastream.DataStream#windowAll
方法将所有元素收集到一个全局窗口中。
请参阅此文档。
推荐阅读
- jenkins - 带有 ansicolor 的 otput 中的 Jenkins 错误 script.sh
- django - Django多个重复下拉列表
- c# - Dotnet 核心控制台应用程序 slack 机器人记录器
- gatsby - Gatsby 不会在 Markdown 中渲染图像,但其他一切都可以
- spring-boot - Assertj-Swagger PropertyType 不匹配
- node.js - 在重置密码时,如何避免来自 Feathersjs 的错误 400?
- asp.net-core - asp.net core 3.1 web api的解决方案是什么
- c# - 在 MVC 中检查 ViewBag 的布尔值
- paypal - 如何使用 PayPal Invoicing webhook 上线?
- reactjs - 如何使用 jest 测试反应组件的标题?