首页 > 解决方案 > 数据流 SDK 版本

问题描述

我通过从 Datalab 单元运行这样的代码来测试 Dataflow 时遇到了问题。

import apache_beam as beam

# Pipeline options:
options                         = beam.options.pipeline_options.PipelineOptions()
gcloud_options                  = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name         = 'test002'
gcloud_options.project          = 'proj'
gcloud_options.staging_location = 'gs://staging'
gcloud_options.temp_location    = 'gs://tmp'
# gcloud_options.region           = 'europe-west2'

# Worker options:
worker_options                  = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb     = 30
worker_options.max_num_workers  = 10

# Standard options:
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'

# Pipeline:

PL = beam.Pipeline(options=options)

query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'    
(
    PL | 'read'  >> beam.io.Read(beam.io.BigQuerySource(project='project', use_standard_sql=False, query=query))
       | 'write' >> beam.io.WriteToText('gs://test/test2.txt', num_shards=1)
)

PL.run()

print "Complete"

有各种成功的尝试,也有一些失败了。这很好理解,但我不明白的是我做了什么来将 SDK 版本从 2.9.0 更改为 2.0.0,如下所示。谁能指出我做了什么以及如何回到 SDK 版本 2.9.0?

数据流屏幕

标签: pythongoogle-cloud-platformgoogle-cloud-dataflowapache-beamgoogle-cloud-datalab

解决方案


您可以通过运行以下命令检查您将使用的 SDK 版本:

!pip freeze | grep beam

在您的情况下,这应该返回:

阿帕奇梁==2.0.0

并通过在顶部添加一个单元格来强制所需的版本(即 2.9.0):

!pip install apache-beam[gcp]==2.9.0

如果您已经提交了作业,您可能需要重新启动内核(重置会话)才能使更改生效。具有不同 SDK 的作业之间存在一天的差异,所以我的猜测是您或其他人更改了依赖项(假设这些依赖项是从同一个 Datalab 实例和笔记本运行的)。也许没有意识到这一点(即内核重新启动)。


推荐阅读