apache-kafka - 使用 Streaming File Sink 将排序计划数据写入文件
问题描述
我是 flink 程序员的新手。我想创建一个程序来从 kafka 获取数据并将数据写入文件中以供下游使用。
任何人都知道如何定义自定义滚动策略以实现以下点列表:
- 当部分文件状态为 Finished 时,我需要指定文件名,或者以其他方式实现文件命名模式,指定完成滚动策略。例如 FileName-[DateTime].txt
- 如何管理bucket..默认分配4个bucket。
- 当应用程序重新启动时,挂起的零件文件不会被重用,并且会创建新的零件文件。
- 还要尽量避免文件覆盖问题。如果实现了第一点,那么这种情况可能永远不会发生。
我正在使用下面的示例代码代码和 flink 流接收器链接。
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withOutputFileConfig(new OutputFileConfig("Eventlog-",".txt"))
.withBucketAssigner(new BasePathBucketAssigner<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(MINUTES.toMillis(15))
.withInactivityInterval(MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 5)
.build())
.build();
请在 flink 中提出任何更好的选择以实现上述目的。
解决方案
推荐阅读
- reactjs - React Context API - 在上下文更改时重新渲染组件而不将渲染包装在消费者中
- c++ - 为什么要使用 extern 关键字在命名空间范围内声明变量?
- javascript - 如何使用 createjs 为图像应用渐变颜色?
- javascript - JQuery:切换新创建的 DIV -> 链接到 $(this) 而不是 $(this).closest()
- python - getattr 返回字符 '>' 而不是 mehtod
- wolfram-mathematica - 有没有办法可以修复我的数据/代码以适应这个 NonLinearModelFit?
- html - Chrome框阴影不一致
- java - ArrayList 中操作 ++/-- 的参数无效
- swift - +/- 购物车视图控制器中的产品数量,每个按钮都位于 TableViewCell 上,默认数量为数组中的整数
- python - 更改 matplotlib 中图例中的标签格式