python - Python BigQuery 存储。并行读取多个流
问题描述
我有以下玩具代码:
import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "libraries_io"
table_ref.table_id = "versions"
parent = "projects/{}".format(your_project_id)
session = client.create_read_session(
table_ref,
parent,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)
reader1 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]), timeout=100000)
reader2 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[1]), timeout=100000)
df = pd.concat([reader1.to_dataframe(session),reader2.to_dataframe(session)])
df
我使用 BALANCED ShardingStrategy 启动了多个可以独立读取的流。
BigqueryStorage 文档说:
但是,如果您想扇出多个阅读器,您可以通过让阅读器处理每个单独的流来实现。
我启动了两个阅读器,一个用于会话中的每个流。之后,将两个数据帧(每个读取器创建一个)连接成一个。然而,与 LIQUID ShardingStrategy 相比,这种方法并没有提高任何速度。
我试图让两个读者并行阅读行。但是,我在库文档中找不到有关并行流读取的任何信息。
问题是:
1) 如果选择 BALANCED ShardingStrategy,BugQuery Storage 是否提供同时读取多个流的任何本机方法?
2)并行读取流的最佳方法是什么?我需要为此使用多处理或异步吗?
3)如果有人可以提供有关并行流reding的任何基本示例,我将不胜感激
解决方案
我进行了一些研究,发现您使用了 BigQuery Storage API 中的代码,您是对的,如果您正在消费多个流,则使用平衡策略,需要提及的是它仍处于 beta 版本。
发生这种情况的一些原因是,您可能只看到 1 个流,因为流分配算法的数据相对“小”,流的数量可能低于请求的数量,具体取决于 2 个因素:表和服务的限制。目前,确定什么是“合理”的算法细节尚未公开,一旦 API 达到普遍可用性阶段,这些细节可能会发生变化。
您也可以尝试上面推荐的多处理包。
推荐阅读
- upscale - 如何在 SAP Commerce Upscale 上添加外部样式?
- java - 从上下文操作模式删除后RecyclerView复制项目
- c# - 无法使用 Azure 在 YAML 中运行自动化测试
- ssl - Poco 库中的手动 SSL 服务器初始化问题
- python - Selenium 如何获取文本元素
- python - 具有可变参数的装饰器
- python - bot.remove_command('help') AttributeError: 'Client' object has no attribute 'remove_command'
- flutter - 如何将新的食品数据插入特定的类别 ID
- selenium - Selenium - TakesScreenshot - java - 转换为 jpg 时遇到问题
- python - 拍摄 rgb 图像通道时出现合并错误