首页 > 解决方案 > 从 Dataflow Pipeline Options 获取独立的参数字符串

问题描述

我正在尝试从 Airflow 触发 Google Dataflow 作业,需要帮助从 Airflow 发送字符串作为参数,该参数可以在 Dataflow 中读取并用作独立字符串。

这是我的代码,DataflowTemplateOperator它发送名为的参数secretCode

        DataflowTemplateOperator(
            task_id=TASK_ID,
            job_name=JOB_NAME,
            template=TEMPLATE_PATH,
            parameters={
                "secretCode": "123456"
            },
            dag=dag
        )

我想secretCode从发送到以下内容中阅读PipelineOptions,但我不知道该怎么做。该代码与类的输入和输出没有任何关系。我只想将代码写入 BigQuery。StringParDoParDo

    val dataToTableRow: PCollection<TableRow> = myCustomDataStructure.apply(
        "transform my data to table row",
        ParDo.of(DataToTableRow())
    )

我想编写从PipelineOptionsBigQuery 返回的密码,如下面的代码所示,但我不知道如何到达那里:

   class DataToTableRow : DoFn<myCustomDataStructure, TableRow>() { 
      @ProcessElement
      fun processElement(@Element myData: myCustomDataStructure, outputReceiver: OutputReceiver<TableRow>) {
          outputReceiver.output(getTableRow(myData))
      }

      private fun getTableRow(myData: myCustomDataStructure): TableRow {
          return TableRow().set("ID", myData.id)
                           .set("SecretCode", secretCode)
      }
   }

我将不胜感激有关如何解决此问题的帮助。提前致谢。

标签: javakotlingoogle-cloud-dataflowapache-beam

解决方案


您需要创建自己的接口来扩展 PipelineOptions 并在此处设置参数。

  public interface SecretOptions extends PipelineOptions {
    String getSecretCode();
    void setSecretCode(String secretCode);
  }

然后,像这样在 Pipeline 上注册您的接口:

PipelineOptionsFactory.register(SecretOptions.class);
SecretOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(SecretOptions.class);

然后,您将能够使用管道上的任何位置访问您的参数options.getSecretCode();

有关文档的更多信息


推荐阅读