streaming - 如何为 Apache Flink 中的每个输入生成输出文件
问题描述
我正在使用 Flink 来处理我的流数据。
流来自其他一些中间件,例如 Kafka、Pravega 等。
说 Pravega 正在发送一些字流,hello world my name is...
.
我需要的是三个步骤:
- 将每个单词映射到我的自定义类对象
MyJson
。 - 将对象映射
MyJson
到字符串。 - 将字符串写入文件:将一个字符串写入一个文件。
例如,对于流hello world my name is
,我应该得到五个文件。
这是我的代码:
// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
.map(new MapFunction<String, MyJson>() {
@Override
public MyJson map(String s) throws Exception {
MyJson myJson = JSON.parseObject(s, MyJson.class);
return myJson;
}
});
// map MyJson to String
DataStream<String> valueInJson = jsonStream
.map(new MapFunction<MyJson, String>() {
@Override
public String map(MyJson myJson) throws Exception {
return myJson.toString();
}
});
// output
valueInJson.print();
此代码会将所有结果输出到 Flink 日志文件。
我的问题是如何将一个单词写入一个输出文件?
解决方案
我认为最简单的方法是使用自定义接收器。
stream.addSink(new WordFileSink)
public static class WordFileSink implements SinkFunction<String> {
@Override
public void invoke(String value, Context context) {
// generate a unique name for the new file and open it
// write the word to the file
// close the file
}
}
请注意,此实现不一定提供仅一次的行为。您可能需要注意文件命名方案是唯一的和确定性的(而不是取决于处理时间),并为文件可能已经存在的情况做好准备。
推荐阅读
- sql-server - 多个嵌套内连接:并非所有记录都显示
- react-native - 我可以使用 React Native 将样式应用于用户输入中的文本子集吗?
- jquery - Select2 在预设标签元素上启用编辑/键入
- python - 如何从 keras/tensorflow 中的顺序模型中获取 logits?
- anychart - 为父组分隔符设置全宽行颜色
- sql - SQL Server 检查约束可为空
- java - 在创建新对象时尝试为 JNI 对象的构造函数提供参数时出错
- performance - Haskell 程序运行很慢
- lua - 试图理解自定义迭代器
- postgresql - PostgreSQL 中的数组和 IN 运算符