首页 > 解决方案 > Python BigQuery 存储。并行读取多个流

问题描述

我有以下玩具代码:

import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "libraries_io"
table_ref.table_id = "versions"

parent = "projects/{}".format(your_project_id)
session = client.create_read_session(
    table_ref,
    parent,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

reader1 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]), timeout=100000)
reader2 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[1]), timeout=100000)

df = pd.concat([reader1.to_dataframe(session),reader2.to_dataframe(session)])
df

我使用 BALANCED ShardingStrategy 启动了多个可以独立读取的流。

BigqueryStorage 文档说:

但是,如果您想扇出多个阅读器,您可以通过让阅读器处理每个单独的流来实现。

我启动了两个阅读器,一个用于会话中的每个流。之后,将两个数据帧(每个读取器创建一个)连接成一个。然而,与 LIQUID ShardingStrategy 相比,这种方法并没有提高任何速度。

我试图让两个读者并行阅读行。但是,我在库文档中找不到有关并行流读取的任何信息。

问题是:

1) 如果选择 BALANCED ShardingStrategy,BugQuery Storage 是否提供同时读取多个流的任何本机方法?

2)并行读取流的最佳方法是什么?我需要为此使用多处理或异步吗?

3)如果有人可以提供有关并行流reding的任何基本示例,我将不胜感激

标签: pythongoogle-cloud-platformgoogle-bigquery

解决方案


我进行了一些研究,发现您使用了 BigQuery Storage API 中的代码,您是对的,如果您正在消费多个流,则使用平衡策略,需要提及的是它仍处于 beta 版本。

发生这种情况的一些原因是,您可能只看到 1 个流,因为流分配算法的数据相对“小”,流的数量可能低于请求的数量,具体取决于 2 个因素:表和服务的限制。目前,确定什么是“合理”的算法细节尚未公开,一旦 API 达到普遍可用性阶段,这些细节可能会发生变化。

您也可以尝试上面推荐的多处理包。


推荐阅读