首页 > 解决方案 > 使用 Streaming File Sink 将排序计划数据写入文件

问题描述

我是 flink 程序员的新手。我想创建一个程序来从 kafka 获取数据并将数据写入文件中以供下游使用。

任何人都知道如何定义自定义滚动策略以实现以下点列表:

  1. 当部分文件状态为 Finished 时,我需要指定文件名,或者以其他方式实现文件命名模式,指定完成滚动策略。例如 FileName-[DateTime].txt
  2. 如何管理bucket..默认分配4个bucket。
  3. 当应用程序重新启动时,挂起的零件文件不会被重用,并且会创建新的零件文件。
  4. 还要尽量避免文件覆盖问题。如果实现了第一点,那么这种情况可能永远不会发生。

我正在使用下面的示例代码代码和 flink 流接收器链接。

final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))                
                .withOutputFileConfig(new OutputFileConfig("Eventlog-",".txt"))
                .withBucketAssigner(new BasePathBucketAssigner<>())                
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(MINUTES.toMillis(15))
                                .withInactivityInterval(MINUTES.toMillis(5))
                                .withMaxPartSize(1024 * 1024 * 5)
                                .build())
                .build();

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#row-encoded-formats

请在 flink 中提出任何更好的选择以实现上述目的。

标签: apache-kafkaapache-flinkflink-streaming

解决方案


推荐阅读