首页 > 解决方案 > 在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道

问题描述

我正在尝试在 Google Data Flow 上的 Spring Boot 项目中运行 Apache Beam 管道,但我一直遇到此错误Failed to construct instance from factory method DataflowRunner#fromOptions(interfaceorg.apache.beam.sdk.options.PipelineOptions

我尝试运行的示例是官方文档https://beam.apache.org/get-started/wordcount-example/提供的基本字数。问题是这个例子为每个例子使用了不同的类,每个例子都有自己的主要功能,但我想做的是在一个spring boot项目中运行这个例子,它有一个实现CommandLineRunner的类。

Spring Boot 主类:

 @SpringBootApplication
  public class BeamApplication {
public static void main(String[] args) {
    SpringApplication.run(BeamApplication.class, args);
}}  

命令行运行器:

@Component
public class Runner implements CommandLineRunner {
@Override
public void run(String[] args) throws Exception {

    WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
    runWordCount(options);
}

static void runWordCount(WordCountOptions options) throws InterruptedException {

    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
            .apply(new CountWords())
            .apply(MapElements.via(new FormatAsTextFn()))
            .apply("WriteCounts", TextIO.write().to(options.getOutput()));
    p.run().waitUntilFinish();
}}

字数选项:

public interface WordCountOptions extends PipelineOptions {

@Description("Path of the file to read from")
@Default.String("./src/main/resources/input.txt")
String getInputFile();
void setInputFile(String value);

@Description("path of output file")
// @Validation.Required
// @Default.String("./target/ts_output/extracted_words")
@Default.String("Path of the file to write to")
String getOutput();
void setOutput(String value);
}

摘录词:

public class ExtractWordsFn extends DoFn<String, String> {
   public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";

@ProcessElement
public void processElement(ProcessContext c) {
    for (String word : c.element().split(TOKENIZER_PATTERN)) {
        if (!word.isEmpty()) {
            c.output(word);
        }}}}

数词:

public  class CountWords extends    PTransform<PCollection<String>,PCollection<KV<String, Long>>> {

@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines){
    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
            ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
            words.apply(Count.perElement());

    return wordCounts;
}}

当我使用 Direct 运行器时,项目按预期工作并在项目的根目录中生成文件,但是当我尝试通过传递这些参数来使用 Google Data Flow--runner=DataflowRunner --project=datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output运行器时(使用 java -jar 或 Intellij 时)。我收到了帖子开头提到的错误。

我正在使用 Java 11,在查看Failed to construction instance from factory method DataflowRunner#fromOptions in beamSql, apache beam 之后, 我尝试将我的代码放入一个新的 Java 8 Spring boot 项目中,但错误仍然相同。

运行 Apache 梁文档(具有不同主干的类)提供的项目时,它在 Google 数据流上运行良好,我可以在 Google 存储桶中看到生成的输出。我的WordCountOptions界面和官方文档提供的一样。

问题可能是由 引起的CommandLineRunner吗?我虽然应用程序没有收到参数,但是当我调试这一行时,

WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class); 

该变量options具有正确的值,即--runner=DataflowRunner --project=target-datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output

编辑:

我发现错误的原因是 gcloud 身份验证和访问 Google Cloud Bucket ( Anonymous caller does not have storage.buckets.list access to project 961543751) 的问题。我仔细检查了访问权限,它应该设置正确,因为它在 Beam 示例默认项目上运行良好。我撤销了所有访问权限并重新设置,但问题仍然存在。我看了这些 https://github.com/googleapis/google-cloud-node/issues/2456 https://github.com/googleapis/google-cloud-ruby/issues/1588,我还在试图找出问题,但现在它似乎是一个依赖版本问题。

标签: javamavenspring-bootgoogle-cloud-dataflowapache-beam

解决方案


推荐阅读