java - 在 flink 中获取传入的字符串流并将它们放入数组中
问题描述
我正在发送JSON
到一个 kafka 主题,并使用 flink 从该主题中读取 json,以便对流进行一些操作。我实现了流的 pojo 和一些操作,如 keyby 等。我使用 map 运算符将该 pojo 映射到另一个。这是代码:
DataStream<OutputPojo> finalobjectStream = integrityobjectStream
// mappo l'input con un output che ha anche il campo count
.map(new MapFunction<IntegrityPojo, OutputPojo>() {
public OutputPojo map(IntegrityPojo input) throws Exception {
OutputPojo output = new OutputPojo();
output.severity = input.severity;
output.file = input.file;
output.agent_name = input.agent_name;
output.comment = input.comment;
output.hostname = input.hostname;
output.logfile = input.logfile;
output.Timestamp = input.Timestamp;
output.count = 1;
return output;
}
})
// raggruppo per il campo comment
.keyBy(((KeySelector<OutputPojo, String>) integrity -> integrity.comment))
// definisco la finestra temporale
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
// addiziono il campo count
.sum("count")
// filtero se count è maggiore di un certo valore
.filter(new FilterFunction<OutputPojo>() {
@Override
public boolean filter(OutputPojo integrity) throws Exception {
return integrity.count > 3;
}
});
我需要 output.file 字段作为传入字符串 input.file 的数组。我怎样才能做到这一点?
解决方案
可以这样吗?
// map transformer in the new MapFunction
DataStream<List<String>> dataStream = inputStream.map(line-> {
String[] fields = line.split(",");
List<String> output = new ArrayList<>();
for(String s:fields){
output.add(s);
}
return output;
});
推荐阅读
- node.js - 如何使用nodejs将结果保存在findOne()Mongoose的变量中
- c# - .Net Core [Authorize] - 或代替 And 进行权限测试
- python - tf.device 的使用
- javascript - 我们什么时候必须在 ES6 箭头函数中使用 () 和 {}
- swift - 有没有办法可以将 GeoFireStore 查询与普通 Firestore 查询结合起来?
- angular - 使用 Angular 6 拦截器刷新令牌
- javascript - 如何使用 jQuery 为带有标题的图像链接创建搜索脚本
- javascript - 检测操作系统并根据该信息更改按钮的 URL
- php - 如何在 question.php 的第 11 行检查值是否返回 true?
- python - 如何将 4 个子张量交错分配给更大的张量?