apache-beam - 如何在 Apache Beam 2.4 中替换 withFilenamePolicy?
问题描述
我正在尝试从 Kafka 源读取,按时间戳分区并使用 Apache Beam 2.4 写入 GCS。我想FilenamePolicy
为输出文件应用自定义。
根据我在 Stackoverflow 和谷歌上的发现,过去可以通过使用
.apply(TextIO.write()
.to("gs://somebucket/")
.withFilenamePolicy(new PerWindowFiles(prefix))
.withWindowedWrites()
.withNumShards(1));
该withFilenamePolicy
选项不再可用。它在 Beam 2.4 中是如何完成的?
我已经尝试使用文档中示例中的writeDynamic()
功能- 但我不明白为什么我的不被接受为输入:FileIO
TextIO
解决方案
withFilenamePolicy()
在 2.2 中被移除
您现在可以使用更简单的语法编写示例
pipeline.apply(Create.of(...))
.apply(TextIO.write()
.to(new PerWindowFiles("gs://somebucket/"))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible("gs://somebucket/tmp"))
.withWindowedWrites()
.withNumShards(1));
注意,使用自定义 FileNamePolicy 您还需要明确指定withTempDirectory
.
在您的第二个(屏幕截图)示例中,您使用的是默认值TextIO.sink()
,即 a FileIO.Sink<String>
to sink Event
s。您需要一个实例Sink<Event>
(它也将实现任何自定义文件命名)或像这样包装Event::getPayload
你Contextful
:
.apply(FileIO.<String, Event>writeDynamic()
.by(Event.getEventType)
.via(Contextful.fn(Event::getPayload))
推荐阅读
- java - 有没有办法使用“groupingBy”为嵌套结构中的多个元素收集地图?
- javascript - 谷歌饼图 - 图像作为标签
- azure - 使用 AzureML 通过数据存储上传数据时出现 403 错误
- python - TypeError:“_typedict”对象不可调用
- amazon-web-services - lambda 事件中的数据不是持久的
- postgresql - 将空别名转换为 PostgreSQL 中 2 个 SQL 联合的整数
- sql - 删除和创建后从数据库中恢复 SQL 表
- etw - ETW 中的对象句柄跟踪
- scala - scala中的(1到10乘3)toList和(1到10乘3).toList有什么区别(或intellij问题?)
- c# - 如何从 IHttpControllerSelector 返回状态码?