google-cloud-platform - 当我将 csv 文件推送到存储桶但将空值插入 bigquery 表时,云函数被触发
问题描述
gsutil notification create -t data-test-notifications -f json gs://vikct001-test-bucket
gcloud pubsub subscriptions create data-test-subscription --topic data-test-notifications
gcloud functions deploy pubsub_to_bigquery --region us-central1 --runtime python37 --trigger-topic data-test-notifications --source gs://pubsub_to_bigquery-bucket/test-code.zip
我在存储桶 gs://vikct001-test-bucket 中有一个简单的 csv 文件,并设置了一个到 pubsub 主题的通知,然后需要通过云功能进行处理并将其插入到 bigquery 表中。
我指的是我的案例研究的以下链接: https ://medium.com/@milosevic81/copy-data-from-pub-sub-to-bigquery-496e003228a1
使用以下命令创建通知/订阅并部署云功能。
gsutil notification create -t data-test-notifications -f json gs://vikct001-test-bucket
gcloud pubsub subscriptions create data-test-subscription --topic data-test-notifications
gcloud functions deploy pubsub_to_bigquery --region us-central1 --runtime python37 --trigger-topic data-test-notifications --source gs://pubsub_to_bigquery-bucket/test-code.zip
这是我的 Python 代码:
from google.cloud import bigquery
import base64, json, sys, os
def pubsub_to_bigquery(event, context):
print("event:",event)
print("context:",context)
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
incoming_data = json.loads(pubsub_message)
print("incoming data:",incoming_data)
write_to_bigquery(os.environ['my_dataset'], os.environ['my_table'], incoming_data)
def write_to_bigquery(dataset, table, document):
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset(dataset)
table_ref = dataset_ref.table(table)
table = bigquery_client.get_table(table_ref)
errors = bigquery_client.insert_rows(table, [document])
if errors != [] :
print(errors, file=sys.stderr)
else:
print("New rows have been added to big query table.")
当我将 csv 文件推送到存储桶但将空值插入 bigquery 表时,云函数会被触发。
以下是我从日志文件中复制的内容。
传入数据:{'kind':'storage#object','id':'vikct001-test-bucket/input_test_records.csv/1590839144279480','selfLink':' https://www.googleapis.com/storage/v1 /b/vikct001-test-bucket/o/input_test_records.csv ','name':'input_test_records.csv','bucket':'vikct001-test-bucket','generation':'1590839144279480','metageneration': '1', 'contentType': 'text/csv', 'timeCreated': '2020-05-30T11:45:44.279Z', '更新': '2020-05-30T11:45:44.279Z', 'storageClass ':'标准','timeStorageClassUpdated':'2020-05-30T11:45:44.279Z','大小':'99','md5Hash':'+gsELw4UmqWR/kUZyO2gSg==','mediaLink':' https://www.googleapis.com/download/storage/v1/b/vikct001-test-bucket/o/input_test_records.csv?generation=1590839144279480&alt=media ', 'crc32c': 'zKnBPQ==', 'etag ': 'CLjb29DB2+kCEAE='}
解决方案
推荐阅读
- node.js - Nodejs mongodb驱动部分更新
- javascript - 如何通过javascript动态更改按钮悬停颜色?
- django - Django:我希望在 div 中加载另一个网页
- linux - 如何使用 sed 删除文本?
- javascript - 使用JS将html编辑按钮添加到表格的每一行
- c++ - 如何使用 C++ 在多个线程中传递结构?
- javascript - P5.js - 从 JSON 加载并保存到 JSON
- clojure - 在 Clojure 中将序列懒惰地划分为不同大小的块
- math - 在 GNU Scientific Library 多根查找器中选择起点
- php - 通过匹配 PHP 中的对象字段将对象值组合到单个数组中