google-cloud-platform - 参数从 rest api 传递到数据流作业时出错 - 云函数
问题描述
任何人都可以分享数据流python代码来接受参数吗?通过rest API传递的参数我面临同样的问题。我的df代码如下:-
def run(argv=None):
parser = argparse.ArgumentParser()
# Specifically we have the input file in CSV format to read and the output BQ table to write.
# This is the final stage of the pipeline, where we define the destination
parser.add_argument(
'--input',
dest='input',
required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
# This example file contains a total of only 10 lines.
# Useful for developing on a small set of data.
default='gs://intient_output/measurementunit.csv')
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to be written. This can be a local file or '
'a file in a Google Storage Bucket.',
default='mygcpdataengineerlab:intientpoc.measurementunit'
)
# Parse arguments from the command line.
known_args, pipeline_args = parser.parse_known_args(argv)
data_ingestion = DataIngestion()
project = ''
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
下面的异常堆栈跟踪
Error- response = request.execute()
File "/env/local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
return wrapped(*args, **kwargs)
File "/env/local/lib/python3.7/site-packages/googleapiclient/http.py", line 907, in execute
raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/mygcpdataengineerlab/templates:launch?gcsPath=gs%3A%2F%2Fgcp_dataflow_csv_bq_code%2Ftemplates&alt=json returned "(9744cfd1809f74a): The workflow could not be created. Causes: (9744cfd1809fa2d): Found unexpected parameters: ['input' (perhaps you meant 'update'), 'output' (perhaps you meant 'job_port')]">
解决方案
我通过接受 arg 作为管道选项来解决它。class MyPipeOpt(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--input', help='输入要读取的文件。这可以是本地文件或谷歌存储桶中的文件。' )
pipeline_options = PipelineOptions(argv) my_options = pipeline_options.view_as(MyPipeOpt)
p = beam.Pipeline(options=pipeline_options) 并将变量用作创建的梁管道中的 my_options.input。
谢谢,
推荐阅读
- powermockito - PowerMockito mockStatic 给出 MissingMethodInvocationException
- spring - Spring 中的 MongoDb Upsert 删除设置为 null 的属性
- amazon-dynamodb - 将从 DynamoDB 获取的项目转换为 JSON 以在 REST API 的请求正文中使用
- c++ - 主函数c++中的argc和argv的值是从哪里来的
- .net - VB.net 中的固定概率
- java - jsp servlet 无法使用 intelling ide 和 tomcat 505 内部错误服务器显示
- exchange-server - Exchange Server 2016:“/ecp”应用程序中的服务器错误
- amazon-web-services - AWS 将 http 重定向到 https 此请求已被阻止;内容必须通过 https 提供
- google-apps-script - 应用程序脚本和 Google 表格 - 如何按代码对 5 个不同的值进行分组?
- vb.net - 通过代码为按钮赋予不同的名称并稍后引用它们