首页 > 解决方案 > 谷歌数据流传递数据存储键作为输入参数

问题描述

我正在尝试创建一个谷歌数据流模板来读取 JSON 文件并将其加载到谷歌数据存储中。下面是我的代码。

我可以成功加载数据,但是我想将数据存储键/种类作为输入参数从我的模板传递并使用相同的创建实体。有人可以帮我如何传递代码吗?

下面是在运行时获取输入的代码片段。我有 --datastore_key 作为其中之一。

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
                '--json_input',
                dest='json_input',
                type=str,
                required=False,
                help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')

        parser.add_value_provider_argument(
                '--project_id',
                dest='project_id',
                type=str,
                required=False,
                help='Input Project ID.')

        parser.add_value_provider_argument(
                '--datastore_key',
                dest='datastore_key',
                type=str,
                required=False,
                help='The Key name')

下面是我根据此处的说明将 datastore_key 分配给实体创建的片段。

class CreateHbaseRow(beam.DoFn): 
    def __init__(self, project_id):
       self.project_id = project_id

    def start_bundle(self):
        self.client = datastore.Client()

    def start_datastore(self, datastore_key):
        self.datastore_key = datastore_key

    def process(self, an_int):
        yield self.datastore_key.get() + an_int

    def process(self, element):
        try:
            key = self.client.key(datastore_key ,element['customerNumber'])
            entity = datastore.Entity(key=key)
            entity.update(element)  
            self.client.put(entity) 
        except:   
            logging.error("Failed with input: ", str(element))

我正在创建如下管道,

p = beam.Pipeline(options=options)

lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))

如果我将它作为运行时参数传递,我没有创建数据存储密钥。如果我像这样硬编码它的工作

key = self.client.key('customer' ,element['customerNumber'])

我想要这样的东西

key = self.client.key(runtime_datastore_key ,runtime_datastore_id)

有人可以帮助我如何将数据存储键/种类作为运行时参数传递吗?

谢谢,GS

标签: python-3.xgoogle-cloud-datastoregoogle-cloud-dataflowvalue-provider

解决方案


看起来您没有将datastore_key值提供程序传递给CreateHbaseRow.


尝试使用:

class CreateHbaseRow(beam.DoFn): 
    def __init__(self, project_id, datastore_key):
       self.project_id = project_id
       self.datastore_key = datastore_key

    def start_bundle(self):
        self.client = datastore.Client()

    def process(self, element):
        try:
            key = self.client.key(datastore_key.get(), element['customerNumber'])
            entity = datastore.Entity(key=key)
            entity.update(element)  
            self.client.put(entity) 
        except:   
            logging.error("Failed with input: ", str(element))

请注意,我离开 project_id 是因为您似乎想要它,但我下面的代码没有使用它


您还希望确保将相关的值提供者从options实例传递到您的DoFn. 因此,您的管道创建代码变为:

p = beam.Pipeline(options=options)

lines_text  = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson()) 
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(options.project_id, options.datastore_key))

推荐阅读