java - 在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道
问题描述
我正在尝试在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道,但我一直遇到此错误Failed to construct instance from factory method DataflowRunner#fromOptions(interfaceorg.apache.beam.sdk.options.PipelineOptions
我尝试运行的示例是官方文档https://beam.apache.org/get-started/wordcount-example/提供的基本字数。问题是这个例子为每个例子使用了不同的类,每个例子都有自己的主要功能,但我想做的是在一个spring boot项目中运行这个例子,它有一个实现CommandLineRunner的类。
Spring Boot 主类:
@SpringBootApplication
public class BeamApplication {
public static void main(String[] args) {
SpringApplication.run(BeamApplication.class, args);
}}
命令行运行器:
@Component
public class Runner implements CommandLineRunner {
@Override
public void run(String[] args) throws Exception {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
runWordCount(options);
}
static void runWordCount(WordCountOptions options) throws InterruptedException {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}}
字数选项:
public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("./src/main/resources/input.txt")
String getInputFile();
void setInputFile(String value);
@Description("path of output file")
// @Validation.Required
// @Default.String("./target/ts_output/extracted_words")
@Default.String("Path of the file to write to")
String getOutput();
void setOutput(String value);
}
摘录词:
public class ExtractWordsFn extends DoFn<String, String> {
public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split(TOKENIZER_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}}}}
数词:
public class CountWords extends PTransform<PCollection<String>,PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines){
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.perElement());
return wordCounts;
}}
当我使用 Direct 运行器时,项目按预期工作并在项目的根目录中生成文件,但是当我尝试通过传递这些参数来使用 Google Data Flow--runner=DataflowRunner --project=datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output
运行器时(使用 java -jar 或 Intellij 时)。我收到了帖子开头提到的错误。
我正在使用 Java 11,在查看Failed to construction instance from factory method DataflowRunner#fromOptions in beamSql, apache beam 之后, 我尝试将我的代码放入一个新的 Java 8 Spring boot 项目中,但错误仍然相同。
运行 Apache 梁文档(具有不同主干的类)提供的项目时,它在 Google 数据流上运行良好,我可以在 Google 存储桶中看到生成的输出。我的WordCountOptions
界面和官方文档提供的一样。
问题可能是由 引起的CommandLineRunner
吗?我虽然应用程序没有收到参数,但是当我调试这一行时,
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
该变量options
具有正确的值,即--runner=DataflowRunner --project=target-datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output
。
编辑:
我发现错误的原因是 gcloud 身份验证和访问 Google Cloud Bucket ( Anonymous caller does not have storage.buckets.list access to project 961543751
) 的问题。我仔细检查了访问权限,它应该设置正确,因为它在 Beam 示例默认项目上运行良好。我撤销了所有访问权限并重新设置,但问题仍然存在。我看了这些
https://github.com/googleapis/google-cloud-node/issues/2456
https://github.com/googleapis/google-cloud-ruby/issues/1588,我还在试图找出问题,但现在它似乎是一个依赖版本问题。
解决方案
推荐阅读
- node.js - 如何下载使用 telegraf 模块发送到我的电报机器人的文件或照片?
- qt - 为什么 QGridLayout setRowStretch 不兑现
- r - 在ggplot中将线分配给第二个y轴
- intellij-idea - Intellij Idea 查找项目中所有已弃用的用法
- angular6 - 导航前隐藏烤面包机通知
- facebook - facebook 观众网络总是质量检查失败
- yocto - 如何清洁、释放空间?
- c# - 更改详细信息视图的数据源c#
- python-3.x - 如何在 python 中打开预训练模型
- amazon-web-services - 为 HTTP 和 HTTPS 打开不同的站点