python - 提供 pubsub 主题作为参数的数据流错误
问题描述
我有一个问题,我正在使用 python 创建一个数据流模板,并且这个模板在启动一个新的数据流作业时需要接受 3 个用户定义的参数。
问题出现在 beam.io.gcp.pubsub.WriteToPubSub() 中,我尝试从 ValueProvider 提供主题名称,根据谷歌文档,创建模板时需要该名称:
https://cloud.google.com/dataflow/docs/guides/templates/creating-templates
源 beam.io.ReadFromPubSub() 与转换 beam.io.gcp.bigquery.WriteToBigQuery() 一样,成功接受订阅值的值提供程序。
显然分享我的代码会有所帮助:)
首先是通常的进口:
from __future__ import absolute_import
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import StaticValueProvider
import json
import time
from datetime import datetime
import dateutil.parser
import sys
接下来是我为提供给模板的输入参数定义的类:
class userOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--subscription',
default='projects/MYPROJECT/subscrpiptions/subscription',
help='PubSub subscription to listen on')
parser.add_value_provider_argument(
'--bqtable',
default='dataset.table',
help='Big Query Table Name in the format project:dataset.table')
parser.add_value_provider_argument(
'--topic',
default='projects/MYPROJECT/subscrpiptions/subscription',
help='PubSub topic to write failed messages to')
管道本身定义为(注意我省略了地图功能)
def run():
user_options = PipelineOptions().view_as(userOptions)
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as p:
records = (
p | 'Read from PubSub'
>> beam.io.ReadFromPubSub(
subscription=str(user_options.subscription),
id_label='Message_ID',
with_attributes=True)
| 'Format Message' >>
beam.Map(format_message_element)
| 'Transform null records to empty list' >>
beam.Map(transform_null_records)
| 'Transform Dates' >>
beam.Map(format_dates)
| 'Write to Big Query' >>
beam.io.gcp.bigquery.WriteToBigQuery(
table=user_options.bqtable,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
insert_retry_strategy='RETRY_NEVER'
)
| 'Write Failures to Pub Sub' >>
beam.io.gcp.pubsub.WriteToPubSub(user_options.topic)
)
现在,当我尝试使用 powershell 命令生成模板时:
python profiles-pipeline.py --project xxxx-xxxxxx-xxxx `
--subscription projects/xxxx-xxxxxx-xxxx/subscriptions/sub-xxxx-xxxxxx-xxxx-dataflow `
--bqtable xxxx-xxxxxx-xxxx:dataset.table `
--topic projects/xxxx-xxxxxx-xxxx/topics/top-xxxx-xxxxxx-xxxx-failures `
--runner DataflowRunner `
--temp_location gs://xxxx-xxxxxx-xxxx/temp/ `
--staging_location gs://xxxx-xxxxxx-xxxx/staging/ `
--template_location gs://xxxx-xxxxxx-xxxx/template
我收到此错误:
File "pipeline.py", line 193, in <module>
run()
File "pipeline.py", line 183, in run
beam.io.gcp.pubsub.WriteToPubSub(user_options.topic)
File "C:\github\pipeline-dataflow-jobs\dataflow\lib\site-packages\apache_beam\io\gcp\pubsub.py", line 292, in __init__
topic, id_label, with_attributes, timestamp_attribute)
File "C:\github\pipeline-dataflow-jobs\dataflow\lib\site-packages\apache_beam\io\gcp\pubsub.py", line 430, in __init__
self.project, self.topic_name = parse_topic(topic)
File "C:\github\pipeline-dataflow-jobs\dataflow\lib\site-packages\apache_beam\io\gcp\pubsub.py", line 325, in parse_topic
match = re.match(TOPIC_REGEXP, full_topic)
File "c:\program files\python37\lib\re.py", line 173, in match
return _compile(pattern, flags).match(string)
TypeError: expected string or bytes-like object
我之前在尝试使用 beam.io.WriteToBigQuery() 时遇到过这个错误,但是一旦我更改为 beam.io.gcp.bigquery.WriteToBigQuery() 错误就得到了解决,因为它接受表名的 ValueProvider。但是对于 pubsub,我找不到有效的写入方法。
非常感激任何的帮助。
解决方案
| 'Encode bytestring' >> beam.Map(encode_byte_string) #我想这部分你已经实现了 | '写给 pusub' >> beam.io.WriteToPubSub(output_topic) -- 它对我有用。
推荐阅读
- jsf - 如何在 JSF 中编写以下 JS 正则表达式
- python - python如何区分实例属性和具有相同名称的装饰属性?
- pdf - PDF 中嵌入的 PFB 会导致 PDF/A 合规性错误
- sql - 如何在 Liquibase 中执行多项操作?
- git - 我已经在 Jenkins 中构建了一个 CI 管道并为所有 8 个微服务创建了 .jar 文件,现在我必须将这些 .jar 和 .properties 文件推送到 github
- python - Flask 从磁盘读取静态 jpg 图像文件并存储在 MySQL 数据库中,无需表单
- flutter - 当我打印进行调试时,它显示类型-->“未来的实例”
' 在颤振中 - opencv - opencv python 到 kotlin 的转换
- ios - UIDatePicker 在渲染时颠簸日期/时间
- reactjs - 如何为 react 和 django web-application 进行测试