sql - 如何编写查询以在 BigQuery 中插入 python 字典中的数组值?
问题描述
我有一个看起来像这样的python字典:
{
'id': 123,
'categories': [
{'category': 'fruit', 'values': ['apple', 'banana']},
{'category': 'animal', 'values': ['cat']},
{'category': 'plant', 'values': []}
]
}
我正在尝试使用python通过API将这些值插入到大查询中的表中,我只需要将上面的内容格式化为“INSERT table VALUES”查询。该表需要具有以下字段:id
, categories.category
, categories.values
。
我需要类别基本上是一个包含类别和每个类别对应值的数组。该表最终应该看起来像这样 - 除了我需要它是每个 id 的一行,相应的类别字段嵌套并具有正确的字段名称:
SELECT 123 as id, (["fruit"], ["apple", "banana"]) as category
UNION ALL (SELECT 123 as id, (["animal"], ["cat"]) as category)
UNION ALL (SELECT 123 as id, (["plant"], ["tree", "bush", "rose"]) as category)
我不确定如何格式化“INSERT”查询以获得所需的结果,有人可以帮忙吗?
解决方案
如果您想使用 Python 将字典加载到 BigQuery,您必须首先准备数据。我选择将 Python 字典转换为.json文件,然后使用 Python API 将其加载到 BigQuery。但是,根据文档,BigQuery 在加载 .json 嵌套数据方面存在一些限制,其中包括:
- 您的 .json 必须是换行符,这意味着每个对象必须在文件中的新行中
- BigQuery 不支持 Json 中的地图或字典。因此,为了做到这一点,您必须将整个数据包装在 [] 中,如您在此处所见。
为此,需要对文件进行一些修改,以便将创建的 .json 文件加载到 BiguQuery。我创建了两个脚本,其中:第一个将 Pyhton dict 转换为 JSON 文件,第二个将 JSON 文件格式化为换行分隔的 json,然后加载到 BigQuery 中。
将 python dict 转换为 .json 文件。请注意,您必须在 [] 之间包装整个数据:
import json
from google.cloud import bigquery
py_dict =[{
'id': 123,
'categories': [
{'category': 'fruit', 'values': ['apple', 'banana']},
{'category': 'animal', 'values': ['cat']},
{'category': 'plant', 'values': []}
]
}]
json_data = json.dumps(py_dict, sort_keys=True)
out_file = open('json_data.json','w+')
json.dump(py_dict,out_file)
其次,将 json 转换为新行分隔的 json 并加载到 BigQuery:
import json
from google.cloud import bigquery
with open("json_data.json", "r") as read_file:
data = json.load(read_file)
result = [json.dumps(record) for record in data]
with open('nd-proceesed.json', 'w') as obj:
for i in result:
obj.write(i+'\n')
client = bigquery.Client()
filename = '/path/to/file.csv'
dataset_id = 'sample'
table_id = 'json_mytable'
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.autodetect = True
with open("nd-proceesed.json", "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
job.result() # Waits for table load to complete.
print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_id, table_id))
然后,在 BigQuery UI 中,您可以按如下方式查询您的表:
SELECT id, categories
FROM `test-proj-261014.sample.json_mytable4` , unnest(categories) as categories
和输出:
推荐阅读
- python - 在 Jupyter 笔记本中出现以下错误。不确定问题是否与 collect() 或任何兼容性问题有关
- python-3.x - (psycopg2.OperationalError) FATAL: 数据库系统正在启动,docker + redis + celery
- javascript - 如何使用javascript查询数据库并将值放入数组
- postgresql - 带有 rds.force_ssl 的 AWS RDS Postgres 上的 Debezium 无法正常工作
- scala - 跳过无限流中的错误
- python - SimpleImputer 仍然在 Pandas Dataframe 中返回 NaN 值
- php - PHP 无法从选择选项 HTML 中获取值
- javascript - 是否有必要将“android_assets”文件夹添加到我的应用程序的主文件夹?
- java - Android / Java / Appium - 向下滑动(滚动)不起作用[移动应用程序]
- swift - SwiftUI - 查看更改时未注册输入数据