首页 > 解决方案 > 在 Dataflow 中使用模板化参数构建动态 Datastore 查询

问题描述

我有从 Google Cloud Datastore 读取数据的 Apache Beam 管道。Pipeline 在 Google Cloud Dataflow 中以批处理模式运行,它是用 Python 编写的。

问题出在模板化参数上,我试图用它来创建带有动态时间戳过滤器的 Datastore 查询。

管道定义如下:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1new.types import Query

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--filter', type=int)

pipeline_options = PipelineOptions()

with beam.Pipeline(options=pipeline_options) as p:

    user_options = pipeline_options.view_as(UserOptions)

    data = (p
        | 'Read' >> ReadFromDatastore(build_query(user_options.filter.get()))
        | ...

build_query功能如下:

def build_query(filter):
    return Query(
        kind='Kind',
        filters=[('timestamp', '>', filter)],
        project='Project'
    )

运行它会导致错误RuntimeValueProvider(...).get() not called from a runtime context

我也试过ReadFromDatastore(build_query(user_options.filter))但是错误是ValueError: (u"Unknown protobuf attr type [while running 'Read/Read']", <class 'apache_beam.options.value_provider.RuntimeValueProvider'>)

如果从等式中删除模板化参数,一切都会正常工作,例如。像这样:ReadFromDatastore(build_query(1563276063))。所以问题在于在构建数据存储查询时使用模板化参数。

我的猜测是build_query应该以其他方式定义,但在花了一些时间处理文档和谷歌搜索之后,我仍然不知道如何。

任何我如何解决这个问题的建议都非常感谢!

编辑 1

实际上,在这种情况下,过滤器始终与当前时间戳相关,因此如果有其他使用动态值的方法,甚至可能不需要将其作为参数传递。尝试过,ReadFromDatastore(build_query(int(time())-90000))但连续两次运行包含完全相同的过滤器。

标签: pythongoogle-cloud-datastoregoogle-cloud-dataflowapache-beam

解决方案


价值提供者需要得到您正在使用的来源的支持。只有在那里,它才能在正确的时刻被打开。

在创建自己的源时,您显然可以完全控制它。使用预先存在的源时,我只看到两个选项:

  1. 在创建模板时提供值,这意味着不要为其使用模板参数
  2. 为预先存在的源创建 PR 以支持模板参数

推荐阅读