首页 > 解决方案 > 试图在数据流模板中实现数据存储命名空间的运行时值

问题描述

我正在尝试使用数据流模板中的运行时 valueprovider 从不同命名空间的数据存储中获取数据。

设置运行时 ValueProvider

class CatalaugeOption(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
             parser.add_value_provider_argument('--partner', default='', dest='partner', help='current date directory')

        my_options = options.view_as(CatalaugeOption)
        parser.add_argument('--kind', dest='kind', default='Product', help='Datastore Kind')
        known_args, pipeline_args = parser.parse_known_args(argv)

从数据存储中读取数据

with beam.Pipeline(argv=pipeline_args) as p:
        req_options = p.options.get_all_options()
        project = req_options['project']
        namespace = my_options.partner
        kind = known_args.kind
        query = query_pb2.Query()
        query.kind.add().name = kind
        protobufs = p | 'Read From Datastore' >> ReadFromDatastore(
            project, query, namespace=namespace)
         p.run()
        return

值错误:无效的 DisplayDataItem。Value RuntimeValueProvider(option: partner, type: str, default_value: '') 是不受支持的类型。

标签: google-cloud-datastoregoogle-cloud-dataflow

解决方案


请注意,此功能已合并到 Apache Beam 主分支中,应该是即将发布的 2.19.0 版本的一部分。

https://github.com/apache/beam/pull/10683 https://issues.apache.org/jira/browse/BEAM-7810


推荐阅读