首页 > 解决方案 > Flink 将所有流元素保存在 HashMap 中

问题描述

我正在尝试在 Flink 作业的窗口函数中使用 HashMap。是否可以将所有并行运算符的所有元素存储在一个运算符的 HashMap 中?

public class SeewoUserWindowFunction implements WindowFunction<ObjectNode, LabelInfo, String, TimeWindow> {

    private static final Logger logger = LoggerFactory.getLogger(SeewoUserWindowFunction.class);
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<ObjectNode> iterable, Collector<LabelInfo> collector) throws Exception {
        try {
            HashMap<String, LabelInfo> result = new HashMap<>();
            iterable.forEach(e -> {
                    String key = e.get("value").get("$tid").toString() + "/" + e.get("value").get("$code").toString();
                    if (result.containsKey(key)) {
                        result.put(key, result.get(key).update(e, timeWindow.getEnd()));
                    } else {
                        result.put(key, LabelInfo.of(e, timeWindow.getEnd()));
                    }
            });
            result.values().stream().forEach(labelInfo -> collector.collect(labelInfo));
        } catch (Exception exception) {
            logger.error("parse exception!", exception);
        }
    }
}

标签: javaparallel-processingapache-flink

解决方案


您可以使用org.apache.flink.streaming.api.datastream.DataStream#windowAll方法将所有元素收集到一个全局窗口中。
请参阅此文档


推荐阅读