首页 > 解决方案 > 参数从 rest api 传递到数据流作业时出错 - 云函数

问题描述

任何人都可以分享数据流python代码来接受参数吗?通过rest API传递的参数我面临同样的问题。我的df代码如下:-

def run(argv=None):

    parser = argparse.ArgumentParser()

    # Specifically we have the input file in CSV format to read and the output BQ table to write.
    # This is the final stage of the pipeline, where we define the destination

    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        # This example file contains a total of only 10 lines.
        # Useful for developing on a small set of data.
        default='gs://intient_output/measurementunit.csv')

    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to be written. This can be a local file or '
        'a file in a Google Storage Bucket.',
        default='mygcpdataengineerlab:intientpoc.measurementunit'
        )

    # Parse arguments from the command line.
    known_args, pipeline_args = parser.parse_known_args(argv)
    data_ingestion = DataIngestion()
    project = ''

    p = beam.Pipeline(options=PipelineOptions(pipeline_args))

下面的异常堆栈跟踪

Error-    response = request.execute()
  File "/env/local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/env/local/lib/python3.7/site-packages/googleapiclient/http.py", line 907, in execute
    raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/mygcpdataengineerlab/templates:launch?gcsPath=gs%3A%2F%2Fgcp_dataflow_csv_bq_code%2Ftemplates&alt=json returned "(9744cfd1809f74a): The workflow could not be created. Causes: (9744cfd1809fa2d): Found unexpected parameters: ['input' (perhaps you meant 'update'), 'output' (perhaps you meant 'job_port')]">

标签: google-cloud-platformgoogle-cloud-functionsgoogle-cloud-dataflow

解决方案


我通过接受 arg 作为管道选项来解决它。class MyPipeOpt(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--input', help='输入要读取的文件。这可以是本地文件或谷歌存储桶中的文件。' )

pipeline_options = PipelineOptions(argv) my_options = pipeline_options.view_as(MyPipeOpt)

p = beam.Pipeline(options=pipeline_options) 并将变量用作创建的梁管道中的 my_options.input。

谢谢,


推荐阅读