首页 > 解决方案 > 如何为 Apache Flink 中的每个输入生成输出文件

问题描述

我正在使用 Flink 来处理我的流数据。

流来自其他一些中间件,例如 Kafka、Pravega 等。

说 Pravega 正在发送一些字流,hello world my name is....

我需要的是三个步骤:

  1. 将每个单词映射到我的自定义类对象MyJson
  2. 将对象映射MyJson到字符串。
  3. 将字符串写入文件:将一个字符串写入一个文件。

例如,对于流hello world my name is,我应该得到五个文件。

这是我的代码:

// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
        FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(adapter)
                .build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
            .map(new MapFunction<String, MyJson>() {
                @Override
                public MyJson map(String s) throws Exception {
                    MyJson myJson = JSON.parseObject(s, MyJson.class);
                    return myJson;
                }
            });
// map MyJson to String
DataStream<String> valueInJson = jsonStream
            .map(new MapFunction<MyJson, String>() {
                @Override
                public String map(MyJson myJson) throws Exception {
                    return myJson.toString();
                }
            });
// output
valueInJson.print();

此代码会将所有结果输出到 Flink 日志文件。

我的问题是如何将一个单词写入一个输出文件?

标签: streamingapache-flinkflink-streaming

解决方案


我认为最简单的方法是使用自定义接收器。

stream.addSink(new WordFileSink)
public static class WordFileSink implements SinkFunction<String> {

    @Override
    public void invoke(String value, Context context) {
        // generate a unique name for the new file and open it
        // write the word to the file
        // close the file
    }
}

请注意,此实现不一定提供仅一次的行为。您可能需要注意文件命名方案是唯一的和确定性的(而不是取决于处理时间),并为文件可能已经存在的情况做好准备。


推荐阅读