google-cloud-platform - 如何在 Google Cloud Storage 中只处理一次新上传的对象?
问题描述
我想将文件接收到 Google Cloud Storage 存储桶中,并为每个文件只运行一次 Python 作业。我希望同时运行许多这样的 Python 作业,以便并行处理许多文件,但每个文件应该只处理一次。
我考虑了以下几点:
发布/订阅消息
为存储桶上的 OBJECT_FINALIZE 事件生成 Pub/Sub 消息。这里的问题是 Pub/Sub可能会多次传递消息,因此侦听同一订阅的 Python 作业池可能会为同一消息运行多个作业,所以我可以...
- 使用 Dataflow 对消息进行重复数据删除,但在我的非流式用例中,dataflow 似乎代价高昂,而且这个答案似乎表明它不是适合这项工作的工具。
或者
- 使用事务数据库(例如 Cloud SQL 上的 PostgreSQL)创建锁定机制。任何收到消息的作业都可以尝试获取与文件同名的锁,任何未能获取锁的作业都可以终止并且不确认消息,并且任何具有锁的作业可以继续处理并将锁标记为已完成以防止将来获取该锁。
我认为 2 会起作用,但它也感觉过度设计。
轮询
让作业轮询存储桶中的新文件,而不是使用 Pub/Sub。
这感觉就像它只是用一个仍然需要锁定机制的不太健壮的解决方案替换 Pub/Sub。
事件弧
使用Eventarc触发保存我的代码的 Cloud Run 容器。这似乎类似于 Pub/Sub,而且更简单,但我找不到 Eventarc 如何处理重试之类的事情的解释,或者它是否带有任何一次性保证。
单个控制器产生多个工人
创建一个中央控制器进程来处理文件事件的重复数据删除(通过 Pub/Sub、轮询或 Eventarc 接收),然后生成工作作业并将每个文件准确地分配给工作作业一次。
我认为这也可行,但会产生单点故障并可能造成吞吐量瓶颈。
解决方案
您走在正确的轨道上,是的,PubSub Push 消息可能会多次传递。
一种简单的管理技术是在开始处理文件时重命名文件。重命名是一个原子事务,所以如果它成功了,你就可以处理它。
PROC_PRF = "processing"
bucketName = # get it from the message
fileName = # Get it from the message)
# Renaming of the file below trriggers another google.storage.object.finalize event
if PROC_PRF in fileName:
print("Exiting due to rename event")
# Ack the message an exit
return
storage_client = storage.Client()
bucket = storage_client.bucket(bucketName)
blob = bucket.get_blob(fileName)
try:
newBlob = bucket.rename_blob(blob,new_name = fileName+'.'+PROC_PRF)
except:
raise RuntimeError("Error: File rename from " + fileName + " failed, is this a duplicate function call?")
# The rename worked - process the file & message
推荐阅读
- angular - 在 Stackblitz Angular 项目中使用 Object.values()
- scala - hbase-spark 加载数据引发 NullPointerException 错误(scala)
- openid - OneLogin - OpenID Connect - 无法使用自定义 URL 方案?离子/科尔多瓦
- c# - WPF xaml 动画代码的版本背后的代码
- javascript - 在移动浏览器上为博主修改 Javascript url 的 Base64 编码 (?m=1)?
- sql - Sequelize 不使用模型中的字段别名
- javascript - JavaScript 库 - 如何公开回调函数
- jekyll - Jekyll 自定义主题(本地运行)
- xaml - Xamarin XAML: Setting Attached Property in Code
- angular - 如何在 Angular 6 中处理 401 未经身份验证的错误