java - Apache Beam Text IO writer 没有将无限源写入文件
问题描述
以下代码在光束直接流道中运行没有任何问题。使用 sqs 消息,但消息不会写入目标位置。
Options options = PipelineOptionsFactory.fromArgs(CONFIG_STREAMING_SQS_GCS).withValidation().as(Options.class);
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials("you-key", "your-secret");
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(basicAWSCredentials));
// AwsUtils.setupOptions(options); <- fetches the secret from GCP, but replaced with inline Auth
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read messages from Sqs", SqsIO.read().withQueueUrl(options.getInputQueueUrl()))
.apply("Get message contents", ParDo.of(new SqsMessageToJson()))
.apply("Print incoming", ParDo.of(new RowPrinter<>("Print incoming")))
.apply("Create Window",Window.into(FixedWindows.of(Duration.standardSeconds(10))))
.apply("Write to GCS", TextIO.write()
.withWindowedWrites()
.withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getDestinationBucketUrl()))
.to(new WindowedFilenamePolicy(options.getOutputFilenamePrefix(),
options.getShardTemplate(),
options.getOutputFilenameSuffix())
.withSubDirectoryPolicy(options.getSubDirectoryPolicy()))
.withNumShards(options.getNumShards()));
PipelineResult run = pipeline.run();
run.waitUntilFinish();
解决方案
看起来这是 Apache Beam 本身的问题。我们已经向谷歌打开了支持票来调查这个问题。
推荐阅读
- python - 如何将 self 参数传递给 python cProfile
- reactjs - 应用 find() 时获得 2 个渲染而不是 1 个渲染
- reactjs - React 构建文件不适用于共享主机
- r - r 中具有平均值和 sd 的频率表,每行有多个案例
- c# - WinForms txtTextbox_MouseEnter 触发表单上的所有文本框
- javascript - 如何删除字符串中任何语言的所有非字母字符?
- java - 评分栏未出现
- elasticsearch - 拒绝为具有文本/关键字数据类型的字段编制索引的数值 (ElaticSearch 6.2.3)
- google-cloud-firestore - firestore verifyId 令牌
- io - 压缩是线性操作吗?