python - 在 Google Dataflow 中使用 FireStore
问题描述
我想在带有 python 的 Dataflow 模板中使用 FireStore。
我做了这样的事情:
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)
这是使用它的适当方式吗?
额外的信息
def firestore_update_multiple(row):
from google.cloud import firestore
db = firestore.Client()
doc_ref = db.collection(u'data').document(u'one')
doc_ref.update({
u'arrayExample': u'DataflowRunner',
u'booleanExample': True
})
解决方案
总体思路是正确的,但您应该考虑减少分配 Firestore 连接的频率,并批量处理您的调用。这是应该执行此操作的 ParDo 示例:
class FirestoreUpdateDoFn(beam.DoFn):
def __init__(self, max_batch_size=500):
self.element_batch = []
self.max_batch_size = max_batch_size
def start_bundle(self):
self.db = firestore.Client()
self.batch = db.batch()
self.some_ref = db.collection(...)
def process(self, row):
self.element_batch.append(row)
if len(self.element_batch) >= self.max_batch_size:
self._flush_updates()
def finish_bundle(self):
self._flush_updates()
self.db.close()
def _flush_updates(self):
for elm in self.element_batch:
self.batch.update(...)
batch.commit()
这应该可以让您减少对 Firestore 的往返调用,并使您的管道更快。然后你会做这样的事情:
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub)
.with_output_types(bytes)
| 'String to dictionary' >> beam.ParDo(FirestoreUpdateDoFn())
)
查看:
- 批量写入的 Firestore 文档
如果您愿意,可以查看的代码
PubSubUnboundedSink
,它与您尝试做的事情相同:在流式传输上运行时有效地写入外部服务
推荐阅读
- ionic4 - 用 div 填充离子含量
- javascript - 使用 javascript 解析嵌套的 XML 矩阵
- python - 如何在不使用 for 循环的情况下对这两个 numpy 操作进行矢量化?
- macos - 如何将函数名称转换为地址并在 LLDB 中添加偏移量?
- python-3.x - ValueError:传递的 save_path 不是有效的检查点:C:\Users\User\model.tflearn
- python - 与 BeautifulSoup.find 混淆?
- python - 如何在 Tensorflow 2.0 中制作参差不齐的批次?
- ruby - Ruby 在这里用 gsub 做什么?
- matlab - 降低目标对比度而不丢失图像细节
- r - ggradar:在中同时使用 group 加 facet_wrap