google-cloud-platform - 如何在云功能中串行处理文件?
问题描述
我写了一个基于云存储触发器的云功能。我有 10-15 个文件以 5 秒的间隔登陆云存储桶,它将数据加载到 bigquery 表中(截断并加载)。
虽然存储桶中有 10 个文件,但我希望云功能以顺序方式处理它们,即一次 1 个文件,因为所有文件都访问同一个表进行操作。
当前,云功能一次触发多个文件,并且由于多个文件试图访问同一个表,因此在 BIgquery 操作中失败。
有没有办法在云功能中配置这个?
提前致谢!
解决方案
您可以通过使用 pubsub 和 Cloud Function 上的最大实例参数来实现此目的。
- 首先,使用Google Cloud Storage 的通知功能,将事件下沉到 PubSub 主题中。
- 现在,每当存储桶上发生事件时,您都会收到一条消息。如果您只想过滤文件创建(对象最终确定),您可以对订阅应用过滤器。我为此写了一篇文章
- 然后,创建一个最大实例设置为 1的 HTTP 函数(如果要应用过滤器,则需要 http 函数)。像这样,只能同时执行 1 个函数。所以,没有并发!
- 最后,在主题上创建一个 PubSub 订阅,无论是否带有过滤器,以在 HTTP 中调用您的函数。
编辑
感谢您的代码,我明白会发生什么。事实上,BigQuery 是一个声明式系统。当您执行请求或加载作业时,会创建一个作业并在后台运行。
在 python 中,你可以明确地等待工作结束,但是,对于 pandas,我没有找到如何!
我刚刚找到了一个Google Cloud 页面来解释如何从 pandas 迁移到 BigQuery 客户端库。如您所见,最后有一条线
# Wait for the load job to complete.
job.result()
而不是等待工作结束。
您在功能中做得很好,_insert_into_bigquery_dwh
但在暂存功能中并非如此_insert_into_bigquery_staging
。这可能会导致 2 个问题:
- dwh 函数适用于旧数据,因为在您触发此作业时暂存尚未完成
- 如果暂存需要 10 秒并在“后台”运行(您不会在代码中明确等待结束)并且 dwh 需要 1 秒,则在 dwh 函数结束时处理下一个文件,甚至如果登台继续在后台运行。这导致了你的问题。
推荐阅读
- javascript - Vue以编程方式添加事件修饰符不起作用
- python - 如何修复 TypeError:+ 不支持的操作数类型:'WindowsPath' 和 'str'
- javascript - Console.logging a function that sums three parameters and then console logs them out
- c - How I can solve this problem during debugging?(unhandled exception at 0xFEFEFEFE)
- database - How to reference new collection in a Model, after the initial design was designed to only refer to one?
- flutter - Flutter: exported apk showing a problem with drawer
- go - How do I manage to run godog in Gitlab?
- rust - Rust for loop with BigUint
- javascript - 如何避免像 & 这样的转义字符,从 JSON 中获取响应
- rust - Rust wasm-bindgen 结构与字符串