首页 > 解决方案 > 在 flink 中获取传入的字符串流并将它们放入数组中

问题描述

我正在发送JSON到一个 kafka 主题,并使用 flink 从该主题中读取 json,以便对流进行一些操作。我实现了流的 pojo 和一些操作,如 keyby 等。我使用 map 运算符将该 pojo 映射到另一个。这是代码:

DataStream<OutputPojo> finalobjectStream = integrityobjectStream
        // mappo l'input con un output che ha anche il campo count
        .map(new MapFunction<IntegrityPojo, OutputPojo>() {
            public OutputPojo map(IntegrityPojo input) throws Exception {
                OutputPojo output = new OutputPojo();
                output.severity = input.severity;
                output.file = input.file;
                output.agent_name = input.agent_name;
                output.comment = input.comment;
                output.hostname = input.hostname;
                output.logfile = input.logfile;
                output.Timestamp = input.Timestamp;
                output.count = 1;
                return output;
            }
        })
        // raggruppo per il campo comment
        .keyBy(((KeySelector<OutputPojo, String>) integrity -> integrity.comment))
        // definisco la finestra temporale
        .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
        // addiziono il campo count
        .sum("count")
        // filtero se count è maggiore di un certo valore
        .filter(new FilterFunction<OutputPojo>() {
            @Override
            public boolean filter(OutputPojo integrity) throws Exception {
                return integrity.count > 3;
            }
        });

我需要 output.file 字段作为传入字符串 input.file 的数组。我怎样才能做到这一点?

标签: javaapache-kafkaapache-flinkflink-streaming

解决方案


可以这样吗?

// map transformer in the new MapFunction
DataStream<List<String>> dataStream = inputStream.map(line-> {
    String[] fields = line.split(",");
    List<String> output = new ArrayList<>();
    for(String s:fields){
        output.add(s);
    }
    return output;
});         

推荐阅读