首页 > 解决方案 > 在数据流python中在运行时向“beam.io.BigQuerySource”提供“查询”参数

问题描述

TLDR:我想beam.io.BigQuerySource每个月使用数据流 API 和模板运行不同的查询。如果这是不可能的,那么我可以beam.io.BigQuerySource在运行时将查询传递给同时仍然使用 Dataflow API 和模板吗?

我有一个读取 BigQuery 表的数据流“批处理”数据管道,如下所示

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--pro_id',
        dest='pro_id',
        type=str,
        default='xxxxxxxxxx',
        help='project id')
    parser.add_argument(
        '--dataset',
        dest='dataset',
        type=str,
        default='xxxxxxxxxx',
        help='bigquery dataset to read data from')

    args, pipeline_args = parser.parse_known_args(argv)
    project_id = args.pro_id
    dataset_id = args.dataset

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(argv=pipeline_args) as p:
    
        companies = (
                p
                | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query_bq(project_id, dataset_id),
                                                                              use_standard_sql=True))
        )

并且查询参数beam.io.BigQuerySource是由这样的函数计算的

from datetime import datetime
def query_bq(project, dataset):
    month = datetime.today().replace(day=1).strftime("%Y_%m_%d")
    query = (
        f'SELECT * FROM `{project}.{dataset}.data_{month}_json` '
        f'LIMIT 10'
    )
    return query

这里有几点需要注意

  1. 我想每天运行一次这个数据管道
  2. 表 id 逐月变化。因此,例如,本月的表 id 将是,data_2020_06_01_json下个月的表 id 将是data_2020_07_01_json,所有这些都是由def query_bq(project, dataset)上面计算的
  3. 我想使用云函数、pubsub 事件、云调度程序使用 Dataflow API 自动运行这个批处理管道。

这是由 cloud-scheduler 每天向 pubsub 发布事件触发的云功能

def run_dataflow(event, context):
    if 'data' in event:
        pubsub_message = base64.b64decode(event['data']).decode('utf-8')
        pubsub_message_dict = ast.literal_eval(pubsub_message)
        event = pubsub_message_dict.get("eventName")
        now = datetime.today().strftime("%Y-%m-%d-%H-%M-%S")
        project = 'xxx-xxx-xxx'
        region = 'europe-west2'
        dataflow = build('dataflow', 'v1b3', cache_discovery=False)
        if event == "run_dataflow":
            job = f'dataflow-{now}'
            template = 'gs://xxxxx/templates/xxxxx'
            request = dataflow.projects().locations().templates().launch(
                projectId=project,
                gcsPath=template,
                location=region,
                body={
                    'jobName': job,
                }
            )
            response = request.execute()
            print(response)

这是我用来在数据流上启动此数据管道的命令

python main.py \
    --setup_file ./setup.py \
    --project xxx-xx-xxxx \
    --pro_id xxx-xx-xxxx \
    --dataset 'xx-xxx-xxx' \
    --machine_type=n1-standard-4 \
    --max_num_workers=5 \
    --num_workers=1 \
    --region europe-west2  \
    --serviceAccount= xxx-xxx-xxx \
    --runner DataflowRunner \
    --staging_location gs://xx/xx \
    --temp_location gs://xx/temp \
    --subnetwork="xxxxxxxxxx" \
    --template_location gs://xxxxx/templates/xxxxx

我面临的问题:

我的query_bq函数在编译和创建数据流模板期间被调用,然后加载到 GCS。并且query_bq在运行时不会调用此函数。因此,每当我的云函数调用数据流创建时,它总是从data_2020_06_01_json表中读取,即使我们进入 7 月、8 月等,查询中的表也将始终保持不变。我真正想要的是该查询根据query_bq功能动态更改,以便将来我可以读取data_2020_07_01_json等等data_2020_08_01_json

我还查看了生成的模板文件,看起来查询在编译后被硬编码到模板中。这是一个片段

 "name": "beamapp-xxxxx-0629014535-344920",
  "steps": [
    {
      "kind": "ParallelRead",
      "name": "s1",
      "properties": {
        "bigquery_export_format": "FORMAT_AVRO",
        "bigquery_flatten_results": true,
        "bigquery_query": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10",
        "bigquery_use_legacy_sql": false,
        "display_data": [
          {
            "key": "source",
            "label": "Read Source",
            "namespace": "apache_beam.runners.dataflow.ptransform_overrides.Read",
            "shortValue": "BigQuerySource",
            "type": "STRING",
            "value": "apache_beam.io.gcp.bigquery.BigQuerySource"
          },
          {
            "key": "query",
            "label": "Query",
            "namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
            "type": "STRING",
            "value": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10"
          },
          {
            "key": "validation",
            "label": "Validation Enabled",
            "namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
            "type": "BOOLEAN",
            "value": false
          }
        ],
        "format": "bigquery",
        "output_info": [
          {

我尝试过的另一种选择

我也尝试了ValueProvider这里定义的https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#pipeline-io-and-runtime-parameters

我将此添加到我的代码中

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--query_bq', type=str)

user_options = pipeline_options.view_as(UserOptions)
p | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query_bq,
                                                                              use_standard_sql=True))

当我运行这个我得到这个错误

WARNING:apache_beam.utils.retry:Retry with exponential backoff: waiting for 3.9023594566785924 seconds before retrying get_query_location because we caught exception: apitools.base.protorpclite.messages.ValidationError: Expected type <class 'str'> for field query, found SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10 (type <class 'apache_beam.options.value_provider.StaticValueProvider'>)

所以我猜beam.io.BigQuerySource不接受ValueProviders

标签: google-cloud-dataflowapache-beamapache-beam-io

解决方案


您不能在 中使用ValueProviders BigQuerySource,但在 Beam 的更新版本中,您可以使用beam.io.ReadFromBigQuery,它很好地支持它们。

你会这样做:

result = (p 
          | beam.io.ReadFromBigQuery(query=options.input_query,
                                     ....))

您可以传递价值提供者,它还有许多其他实用程序。查看它的文档


推荐阅读