java - 从 Dataflow Pipeline Options 获取独立的参数字符串
问题描述
我正在尝试从 Airflow 触发 Google Dataflow 作业,需要帮助从 Airflow 发送字符串作为参数,该参数可以在 Dataflow 中读取并用作独立字符串。
这是我的代码,DataflowTemplateOperator
它发送名为的参数secretCode
:
DataflowTemplateOperator(
task_id=TASK_ID,
job_name=JOB_NAME,
template=TEMPLATE_PATH,
parameters={
"secretCode": "123456"
},
dag=dag
)
我想secretCode
从发送到以下内容中阅读PipelineOptions
,但我不知道该怎么做。该代码与类的输入和输出没有任何关系。我只想将代码写入 BigQuery。String
ParDo
ParDo
val dataToTableRow: PCollection<TableRow> = myCustomDataStructure.apply(
"transform my data to table row",
ParDo.of(DataToTableRow())
)
我想编写从PipelineOptions
BigQuery 返回的密码,如下面的代码所示,但我不知道如何到达那里:
class DataToTableRow : DoFn<myCustomDataStructure, TableRow>() {
@ProcessElement
fun processElement(@Element myData: myCustomDataStructure, outputReceiver: OutputReceiver<TableRow>) {
outputReceiver.output(getTableRow(myData))
}
private fun getTableRow(myData: myCustomDataStructure): TableRow {
return TableRow().set("ID", myData.id)
.set("SecretCode", secretCode)
}
}
我将不胜感激有关如何解决此问题的帮助。提前致谢。
解决方案
您需要创建自己的接口来扩展 PipelineOptions 并在此处设置参数。
public interface SecretOptions extends PipelineOptions {
String getSecretCode();
void setSecretCode(String secretCode);
}
然后,像这样在 Pipeline 上注册您的接口:
PipelineOptionsFactory.register(SecretOptions.class);
SecretOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(SecretOptions.class);
然后,您将能够使用管道上的任何位置访问您的参数options.getSecretCode();
有关文档的更多信息
推荐阅读
- java - 多线程终止线程并允许访问其他线程
- css - 如何在 Bootstrap Vue 中更改箭头颜色?
- sql - Postgtres:考虑小时/分钟计算浮动天数
- r - 语法 %||% 的含义是什么?
- c++ - 抑制 LeakSanitizer 输出
- android - 检测已卸载的应用程序:如何向许多设备发送 Fcm 请求以了解其中一个已从 Firebase 未注册?
- amazon-web-services - 在 Node 中查找与已知公共 IP 地址关联的 EC2 资源 ID
- python - 出现错误:AttributeError: 'Node' object has no attribute 'output_masks' at flatten layer 用于展平嵌入输出
- c# - EFCore 跟踪记录状态:IsNew/IsLoaded
- node.js - NodeJs/Express Puppeteer 启动本地视图