首页 > 解决方案 > 如何将自定义 MyPipelineOptions 传递给 Google Dataflow DoFN?

问题描述

当我尝试将 MyPipelineOptions 参数添加到我的 Google Dataflow DoFN时,出现编译器错误:

java.lang.IllegalArgumentException:
com.xxx.MyProcessor,
@ProcessElement parseItem(PubsubMessage, MyPipelineOptions, OutputReceiver),
@ProcessElement parseItem(PubsubMessage, MyPipelineOptions, OutputReceiver),
parameter of type MyPipelineOptions at index 1:
MyPipelineOptions is not a valid context parameter.
Should be one of [BoundedWindow, RestrictionTracker<?, ?>]

如果我将 MyPipelineOptions 更改为 PipelineOptions,错误就消失了,但是如果我尝试在我的函数中强制转换回 MyPipelineOptions,我会得到一个 ClassCastException,所以我猜这不是正确的方法......知道我如何传递我的自定义选项类到元素处理器?

下面是代码结构:

import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;

public interface MyPipelineOptions extends DataflowPipelineOptions {
  ...
}
public class MyProcessor extends DoFn<PubsubMessage, String> {
  @ProcessElement
  public void parseItem(@Element PubsubMessage message, MyPipelineOptions po, OutputReceiver<String> out) throws Exception {
    ...
  }
}

请注意,文档仅显示了非自定义 PipelineOptions 的示例:

PipelineOptions:当前管道的 PipelineOptions 始终可以通过将其添加为参数在流程方法中访问:

.of(new DoFn<String, String>() {
  public void processElement(
    @Element String word, PipelineOptions options) {

  }
 })

标签: google-cloud-dataflowapache-beam

解决方案


好的,发现问题。参数 PipelineOptions 是一个代理。为了正确获得它,我需要这样做:

public class MyProcessor extends DoFn<PubsubMessage, String> {
  @ProcessElement
  public void parseItem(
    @Element PubsubMessage message,
    PipelineOptions po,
    OutputReceiver<String> out) throws Exception {

      MyPipelineOptions opts = po.as(MyPipelineOptions.class);
      ...
    }
  }
}

推荐阅读