首页 > 解决方案 > 当我将 csv 文件推送到存储桶但将空值插入 bigquery 表时,云函数被触发

问题描述

gsutil notification create -t data-test-notifications -f json gs://vikct001-test-bucket
gcloud pubsub subscriptions create data-test-subscription --topic data-test-notifications
gcloud functions deploy pubsub_to_bigquery --region us-central1 --runtime python37 --trigger-topic data-test-notifications --source gs://pubsub_to_bigquery-bucket/test-code.zip

我在存储桶 gs://vikct001-test-bucket 中有一个简单的 csv 文件,并设置了一个到 pubsub 主题的通知,然后需要通过云功能进行处理并将其插入到 bigquery 表中。

我指的是我的案例研究的以下链接: https ://medium.com/@milosevic81/copy-data-from-pub-sub-to-bigquery-496e003228a1

使用以下命令创建通知/订阅并部署云功能。

gsutil notification create -t data-test-notifications -f json gs://vikct001-test-bucket
gcloud pubsub subscriptions create data-test-subscription --topic data-test-notifications
gcloud functions deploy pubsub_to_bigquery --region us-central1 --runtime python37 --trigger-topic data-test-notifications --source gs://pubsub_to_bigquery-bucket/test-code.zip

这是我的 Python 代码:

from google.cloud import bigquery
import base64, json, sys, os


def pubsub_to_bigquery(event, context):

   print("event:",event)
   print("context:",context)
   pubsub_message = base64.b64decode(event['data']).decode('utf-8')
   incoming_data = json.loads(pubsub_message)
   print("incoming data:",incoming_data)
   write_to_bigquery(os.environ['my_dataset'], os.environ['my_table'], incoming_data)

def write_to_bigquery(dataset, table, document):
   bigquery_client = bigquery.Client()
   dataset_ref = bigquery_client.dataset(dataset)
   table_ref = dataset_ref.table(table)
   table = bigquery_client.get_table(table_ref)
   errors = bigquery_client.insert_rows(table, [document])
   if errors != [] :
      print(errors, file=sys.stderr)
   else:
      print("New rows have been added to big query table.")

  当我将 csv 文件推送到存储桶但将空值插入 bigquery 表时,云函数会被触发。

以下是我从日志文件中复制的内容。

传入数据:{'kind':'storage#object','id':'vikct001-test-bucket/input_test_records.csv/1590839144279480','selfLink':' https://www.googleapis.com/storage/v1 /b/vikct001-test-bucket/o/input_test_records.csv ','name':'input_test_records.csv','bucket':'vikct001-test-bucket','generation':'1590839144279480','metageneration': '1', 'contentType': 'text/csv', 'timeCreated': '2020-05-30T11:45:44.279Z', '更新': '2020-05-30T11:45:44.279Z', 'storageClass ':'标准','timeStorageClassUpdated':'2020-05-30T11:45:44.279Z','大小':'99','md5Hash':'+gsELw4UmqWR/kUZyO2gSg==','mediaLink':' https://www.googleapis.com/download/storage/v1/b/vikct001-test-bucket/o/input_test_records.csv?generation=1590839144279480&alt=media ', 'crc32c': 'zKnBPQ==', 'etag ': 'CLjb29DB2+kCEAE='}

标签: google-cloud-platformgoogle-bigquerygoogle-cloud-functionsgoogle-cloud-pubsub

解决方案


推荐阅读