首页 > 解决方案 > 如何调用按需 bigquery 数据传输服务?

问题描述

我真的很喜欢 BigQuery 的数据传输服务。我有要加载到 BQ 中的确切架构中的平面文件。如果只设置 DTS 计划以获取匹配模式的 GCS 文件并将其加载到 BQ 中,那就太棒了。我喜欢在复制和电子邮件后删除源文件的内置选项,以防万一。但最大的遗憾是最小间隔是 60 分钟。这很疯狂。也许我可以延迟 10 分钟。

因此,如果我将 DTS 设置为按需提供,我如何从 API 调用它?我正在考虑创建一个每 10 分钟按需调用它的 cronjob。但我无法通过文档弄清楚如何调用它。

此外,将 GCS 文件(不需要 ETL)移动到与确切架构匹配的 bq 表中,我第二好的最可靠和最便宜的方法是什么。我应该使用 Cloud Scheduler、Cloud Functions、DataFlow、Cloud Run 等吗?

如果我使用 Cloud Function,如何在调用时将 GCS 中的所有文件作为一个 bq 加载作业提交?

最后,有人知道DTS将来是否会降低限制到10分钟?

标签: google-bigquerydata-transfer

解决方案


因此,如果我将 DTS 设置为按需提供,我如何从 API 调用它?我正在考虑创建一个每 10 分钟按需调用它的 cronjob。但我无法通过文档弄清楚如何调用它。

StartManualTransferRunsRPC 库的一部分,但目前还没有等效的 REST API。如何使用它取决于您的环境。例如,您可以使用 Python 客户端库 ( docs )。

例如,我使用了以下代码(您需要运行pip install google-cloud-bigquery-datatransfer依赖项):

import time

from google.cloud import bigquery_datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp


client = bigquery_datatransfer_v1.DataTransferServiceClient()

PROJECT_ID = 'PROJECT_ID'
TRANSFER_CONFIG_ID = '5e6...7bc'  # alphanumeric ID you'll find in the UI 

parent = client.project_transfer_config_path(PROJECT_ID, TRANSFER_CONFIG_ID)

start_time = bigquery_datatransfer_v1.types.Timestamp(seconds=int(time.time() + 10))

response = client.start_manual_transfer_runs(parent, requested_run_time=start_time)
print(response)

请注意,您需要使用正确的传输配置 ID,并且requested_run_time必须是类型bigquery_datatransfer_v1.types.Timestamp(文档中没有示例)。我将开始时间设置为比当前执行时间提前 10 秒。

您应该得到如下响应:

runs {
  name: "projects/PROJECT_NUMBER/locations/us/transferConfigs/5e6...7bc/runs/5e5...c04"
  destination_dataset_id: "DATASET_NAME"
  schedule_time {
    seconds: 1579358571
    nanos: 922599371
  }
  ...
  data_source_id: "google_cloud_storage"
  state: PENDING
  params {
    ...
  }
  run_time {
    seconds: 1579358581
  }
  user_id: 28...65
}

并且传输按预期触发(不要介意错误):

在此处输入图像描述

此外,将 GCS 文件(不需要 ETL)移动到与确切架构匹配的 bq 表中,我第二好的最可靠和最便宜的方法是什么。我应该使用 Cloud Scheduler、Cloud Functions、DataFlow、Cloud Run 等吗?

有了这个,您可以设置一个 cron 作业以每十分钟执行一次您的功能。正如评论中所讨论的,最小间隔是 60 分钟,因此它不会拾取不到一小时的文件(docs)。

除此之外,这不是一个非常强大的解决方案,您的后续问题会在这里发挥作用。我认为这些可能过于宽泛,无法在单个 StackOverflow 问题中解决,但我想说,对于按需刷新,Cloud Scheduler + Cloud Functions/Cloud Run 可以很好地工作。

如果您需要 ETL,Dataflow 将是最好的,但它有一个 GCS 连接器可以观察文件模式(示例)。这样,您将跳过传输,设置观察间隔和加载作业触发频率以将文件写入 BigQuery。与以前的方法相反,VM 将在流式管道中持续运行,但可以进行 10 分钟的观察期。

如果您有复杂的工作流程/依赖项,Airflow 最近引入了操作员来开始手动运行。

如果我使用 Cloud Function,如何在调用时将 GCS 中的所有文件作为一个 bq 加载作业提交?

您可以在创建传输时使用通配符来匹配文件模式:

在此处输入图像描述

此外,这可以使用Cloud Storage 的 Pub/Sub 通知来逐个文件地完成,以触发 Cloud Function。

最后,有人知道DTS将来是否会降低限制到10分钟?

这里已经有一个功能请求。随意给它加标以显示您的兴趣并接收更新


推荐阅读