首页 > 解决方案 > Apache Beam Text IO writer 没有将无限源写入文件

问题描述

以下代码在光束直接流道中运行没有任何问题。使用 sqs 消息,但消息不会写入目标位置。


Options options = PipelineOptionsFactory.fromArgs(CONFIG_STREAMING_SQS_GCS).withValidation().as(Options.class);

BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials("you-key", "your-secret");
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(basicAWSCredentials));
// AwsUtils.setupOptions(options); <- fetches the secret from GCP, but replaced with inline Auth
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read messages from Sqs", SqsIO.read().withQueueUrl(options.getInputQueueUrl()))
        .apply("Get message contents", ParDo.of(new SqsMessageToJson()))
        .apply("Print incoming", ParDo.of(new RowPrinter<>("Print incoming")))
        .apply("Create Window",Window.into(FixedWindows.of(Duration.standardSeconds(10))))
        .apply("Write to GCS", TextIO.write()
            .withWindowedWrites()
            .withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getDestinationBucketUrl()))
            .to(new WindowedFilenamePolicy(options.getOutputFilenamePrefix(),
                options.getShardTemplate(),
                options.getOutputFilenameSuffix())
                .withSubDirectoryPolicy(options.getSubDirectoryPolicy()))
            .withNumShards(options.getNumShards()));
PipelineResult run = pipeline.run();
run.waitUntilFinish();

标签: javaapache-beamapache-beam-io

解决方案


看起来这是 Apache Beam 本身的问题。我们已经向谷歌打开了支持票来调查这个问题。


推荐阅读