首页 > 解决方案 > 在 Google Dataflow 中使用 FireStore

问题描述

我想在带有 python 的 Dataflow 模板中使用 FireStore。

我做了这样的事情:

with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)

这是使用它的适当方式吗?


额外的信息

def firestore_update_multiple(row):
    from google.cloud import firestore
    db = firestore.Client()
    doc_ref = db.collection(u'data').document(u'one')

    doc_ref.update({
        u'arrayExample': u'DataflowRunner',
        u'booleanExample': True
    })

标签: pythongoogle-cloud-platformgoogle-cloud-firestoregoogle-cloud-dataflow

解决方案


总体思路是正确的,但您应该考虑减少分配 Firestore 连接的频率,并批量处理您的调用。这是应该执行此操作的 ParDo 示例:

class FirestoreUpdateDoFn(beam.DoFn):

  def __init__(self, max_batch_size=500):
    self.element_batch = []
    self.max_batch_size = max_batch_size

  def start_bundle(self):
    self.db = firestore.Client()
    self.batch = db.batch()
    self.some_ref = db.collection(...)

  def process(self, row):
    self.element_batch.append(row)
    if len(self.element_batch) >= self.max_batch_size:
      self._flush_updates()

  def finish_bundle(self):
    self._flush_updates()
    self.db.close()

  def _flush_updates(self):
    for elm in self.element_batch:
      self.batch.update(...)
    batch.commit()

这应该可以让您减少对 Firestore 的往返调用,并使您的管道更快。然后你会做这样的事情:

with beam.Pipeline(options=options) as p:
    (p
     | 'Read from PubSub' >> beam.io.ReadFromPubSub(sub)
                              .with_output_types(bytes)
     | 'String to dictionary' >> beam.ParDo(FirestoreUpdateDoFn())
    )

查看:


推荐阅读