python-3.x - 谷歌数据流传递数据存储键作为输入参数
问题描述
我正在尝试创建一个谷歌数据流模板来读取 JSON 文件并将其加载到谷歌数据存储中。下面是我的代码。
我可以成功加载数据,但是我想将数据存储键/种类作为输入参数从我的模板传递并使用相同的创建实体。有人可以帮我如何传递代码吗?
下面是在运行时获取输入的代码片段。我有 --datastore_key 作为其中之一。
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--json_input',
dest='json_input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
parser.add_value_provider_argument(
'--project_id',
dest='project_id',
type=str,
required=False,
help='Input Project ID.')
parser.add_value_provider_argument(
'--datastore_key',
dest='datastore_key',
type=str,
required=False,
help='The Key name')
下面是我根据此处的说明将 datastore_key 分配给实体创建的片段。
class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id):
self.project_id = project_id
def start_bundle(self):
self.client = datastore.Client()
def start_datastore(self, datastore_key):
self.datastore_key = datastore_key
def process(self, an_int):
yield self.datastore_key.get() + an_int
def process(self, element):
try:
key = self.client.key(datastore_key ,element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
我正在创建如下管道,
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(project_id))
如果我将它作为运行时参数传递,我没有创建数据存储密钥。如果我像这样硬编码它的工作
key = self.client.key('customer' ,element['customerNumber'])
我想要这样的东西
key = self.client.key(runtime_datastore_key ,runtime_datastore_id)
有人可以帮助我如何将数据存储键/种类作为运行时参数传递吗?
谢谢,GS
解决方案
看起来您没有将datastore_key
值提供程序传递给CreateHbaseRow
.
尝试使用:
class CreateHbaseRow(beam.DoFn):
def __init__(self, project_id, datastore_key):
self.project_id = project_id
self.datastore_key = datastore_key
def start_bundle(self):
self.client = datastore.Client()
def process(self, element):
try:
key = self.client.key(datastore_key.get(), element['customerNumber'])
entity = datastore.Entity(key=key)
entity.update(element)
self.client.put(entity)
except:
logging.error("Failed with input: ", str(element))
请注意,我离开 project_id 是因为您似乎想要它,但我下面的代码没有使用它。
您还希望确保将相关的值提供者从options
实例传递到您的DoFn
. 因此,您的管道创建代码变为:
p = beam.Pipeline(options=options)
lines_text = p | "Read Json From GCS" >> beam.io.ReadFromText(json_input)
lines_json = lines_text | "Convert To Json" >> beam.ParDo(ConvertToJson())
lines_json | "Create Entities From Json" >> beam.ParDo(CreateHbaseRow(options.project_id, options.datastore_key))
推荐阅读
- javascript - 循环遍历数组以进行 axios 调用
- r - 堆叠条形图的标题和图例在使用 ggplotly() 转换图后失去位置
- java - 如何使用 ThymLeaf 将对象的属性绑定到隐藏字段?
- c# - Convert.ChangeType 为可为空的 int 引发无效的强制转换异常
- apache - 添加重写规则后禁止访问
- python - 将文件夹中的所有 DAT 文件转换为 CSV 文件
- windows - 从命令提示符调用 SCF 项
- excel - 如何使用 VBA 在范围内设置背景颜色?
- r - 根据范围填充面板数据集中缺失的行
- typescript - MobX:动作注释的方法未定义