首页 > 解决方案 > 如何在 Apache Beam 2.4 中替换 withFilenamePolicy?

问题描述

我正在尝试从 Kafka 源读取,按时间戳分区并使用 Apache Beam 2.4 写入 GCS。我想FilenamePolicy为输出文件应用自定义。

根据我在 Stackoverflow 和谷歌上的发现,过去可以通过使用

.apply(TextIO.write()
                    .to("gs://somebucket/")
                    .withFilenamePolicy(new PerWindowFiles(prefix))
                    .withWindowedWrites()
                    .withNumShards(1));

withFilenamePolicy选项不再可用。它在 Beam 2.4 中是如何完成的?

我已经尝试使用文档中示例中的writeDynamic()功能- 但我不明白为什么我的不被接受为输入:FileIOTextIO

在此处输入图像描述

标签: apache-beam

解决方案


withFilenamePolicy()在 2.2 中被移除

您现在可以使用更简单的语法编写示例

pipeline.apply(Create.of(...))
  .apply(TextIO.write()
    .to(new PerWindowFiles("gs://somebucket/"))
    .withTempDirectory(
        FileBasedSink.convertToFileResourceIfPossible("gs://somebucket/tmp"))
    .withWindowedWrites()
    .withNumShards(1));

注意,使用自定义 FileNamePolicy 您还需要明确指定withTempDirectory.

在您的第二个(屏幕截图)示例中,您使用的是默认值TextIO.sink(),即 a FileIO.Sink<String>to sink Events。您需要一个实例Sink<Event>(它也将实现任何自定义文件命名)或像这样包装Event::getPayloadContextful

.apply(FileIO.<String, Event>writeDynamic()
  .by(Event.getEventType)
  .via(Contextful.fn(Event::getPayload))

推荐阅读