google-cloud-dataflow - 如何将自定义 MyPipelineOptions 传递给 Google Dataflow DoFN?
问题描述
当我尝试将 MyPipelineOptions 参数添加到我的 Google Dataflow DoFN中时,出现编译器错误:
java.lang.IllegalArgumentException:
com.xxx.MyProcessor,
@ProcessElement parseItem(PubsubMessage, MyPipelineOptions, OutputReceiver),
@ProcessElement parseItem(PubsubMessage, MyPipelineOptions, OutputReceiver),
parameter of type MyPipelineOptions at index 1:
MyPipelineOptions is not a valid context parameter.
Should be one of [BoundedWindow, RestrictionTracker<?, ?>]
如果我将 MyPipelineOptions 更改为 PipelineOptions,错误就消失了,但是如果我尝试在我的函数中强制转换回 MyPipelineOptions,我会得到一个 ClassCastException,所以我猜这不是正确的方法......知道我如何传递我的自定义选项类到元素处理器?
下面是代码结构:
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
public interface MyPipelineOptions extends DataflowPipelineOptions {
...
}
public class MyProcessor extends DoFn<PubsubMessage, String> {
@ProcessElement
public void parseItem(@Element PubsubMessage message, MyPipelineOptions po, OutputReceiver<String> out) throws Exception {
...
}
}
请注意,文档仅显示了非自定义 PipelineOptions 的示例:
PipelineOptions:当前管道的 PipelineOptions 始终可以通过将其添加为参数在流程方法中访问:
.of(new DoFn<String, String>() {
public void processElement(
@Element String word, PipelineOptions options) {
}
})
解决方案
好的,发现问题。参数 PipelineOptions 是一个代理。为了正确获得它,我需要这样做:
public class MyProcessor extends DoFn<PubsubMessage, String> {
@ProcessElement
public void parseItem(
@Element PubsubMessage message,
PipelineOptions po,
OutputReceiver<String> out) throws Exception {
MyPipelineOptions opts = po.as(MyPipelineOptions.class);
...
}
}
}
推荐阅读
- python - 尽管将 Python multiprocessing.Lock 作为目标函数参数传递,但在并行化时为无
- amazon-web-services - AWS DMS 更改处理 - 批处理行为
- python - (Django) (Factory Boy) (Errno 13) 权限被拒绝
- zsh - 在 mac git 分支上安装 oh-my-zsh 后不显示,但在 .oh-my-zsh 之后它可以工作
- asp.net-core - IdentityServer4 - 错误:未知客户端或未启用:oauthClient
- sql - 顶点 Oracle Sql
- python - 使用 BeautifulSoup 进行追溯
- reporting-services - 如何正确地将表达式从 SSRS 转换为 Power BI
- html - 如何使父子div具有相同的响应高度
- python - 如何将带有列表和整数的列表添加到csv中?