python - Apache Beam 将 BigQuery 表和架构作为参数写入
问题描述
我正在为 Apache Beam 使用 Python SDK。数据表和模式的值在 PCollection 中。这是我从 PubSub 读到的消息:
{"DEVICE":"rms005_m1","DATESTAMP":"2020-05-29 20:54:26.733 UTC","SINUMERIK__x_position":69.54199981689453,"SINUMERIK__y_position":104.31400299072266,"SINUMERIK__z_position":139.0850067138672}
然后我想使用 json 消息中的值将其写入 BigQuery,其中 lambda 函数用于数据表,此函数用于模式:
def set_schema(data):
list = []
for name in data:
if name == 'STATUS' or name == 'DEVICE':
type = 'STRING'
elif name == 'DATESTAMP':
type = 'TIMESTAMP'
else:
type = 'FLOAT'
list.append(name + ':' + type)
schema = ",".join(list)
return schema
data = (p
| "Read from PubSub" >> beam.io.ReadFromPubSub(topic=topic)
| "Parse json" >> beam.Map(json_parse)
| "Write to BQ" >> beam.io.WriteToBigQuery(
table='project:dataset{datatable}__opdata'.format(datatable = lambda element: element["DEVICE"]),
schema=set_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
当我执行它时,我收到此错误:
ValueError: Expected a table reference (PROJECT:DATASET.TABLE or DATASET.TABLE) instead of project:dataset.<function <lambda> at 0x7fa0dc378710>__opdata
如何将 PCollection 的值用作 PTransform 中的变量?
解决方案
您必须将函数传递到表中。试试这个,而不是:
| "Write to BQ" >> beam.io.WriteToBigQuery(
table=lambda element: 'project:dataset{datatable}__opdata'.format(datatable = element["DEVICE"]),
schema=set_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
推荐阅读
- nginx - 为什么 nginx 在我的 Kubernetes Ingress 路由上返回 404?
- react-native - 如何使用 React Native 中的 Animated API 将命运之轮停在特定点?
- azure-devops - 当我尝试部署版本时 Azure DevOps Server 挂起 - 这是什么原因/补救措施?
- java - Spring Boot:如何根据当前环境或 spring 配置文件使用自定义 logback.xml
- node.js - 如何在 setTimeout 中调用异步函数
- node.js - 在 nodejs 中,如何先读取 CSV 标头但仍异步处理文件的其余部分
- java - 如何在数组中设置double类型的值
- objective-c - 将 UISegmentedControlNoSegment 指定为 UISegmentedControl 的 selectedSegmentIndex 对 iOS 13 没有影响
- css - 如何在 Angular 中正确处理 Scroll 上的导航栏颜色变化?
- c# - 为什么没有为可空参数和可选参数设置值