python - 数据流 SDK 版本
问题描述
我通过从 Datalab 单元运行这样的代码来测试 Dataflow 时遇到了问题。
import apache_beam as beam
# Pipeline options:
options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'test002'
gcloud_options.project = 'proj'
gcloud_options.staging_location = 'gs://staging'
gcloud_options.temp_location = 'gs://tmp'
# gcloud_options.region = 'europe-west2'
# Worker options:
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb = 30
worker_options.max_num_workers = 10
# Standard options:
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'
# Pipeline:
PL = beam.Pipeline(options=options)
query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
(
PL | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='project', use_standard_sql=False, query=query))
| 'write' >> beam.io.WriteToText('gs://test/test2.txt', num_shards=1)
)
PL.run()
print "Complete"
有各种成功的尝试,也有一些失败了。这很好理解,但我不明白的是我做了什么来将 SDK 版本从 2.9.0 更改为 2.0.0,如下所示。谁能指出我做了什么以及如何回到 SDK 版本 2.9.0?
解决方案
您可以通过运行以下命令检查您将使用的 SDK 版本:
!pip freeze | grep beam
在您的情况下,这应该返回:
阿帕奇梁==2.0.0
并通过在顶部添加一个单元格来强制所需的版本(即 2.9.0):
!pip install apache-beam[gcp]==2.9.0
如果您已经提交了作业,您可能需要重新启动内核(重置会话)才能使更改生效。具有不同 SDK 的作业之间存在一天的差异,所以我的猜测是您或其他人更改了依赖项(假设这些依赖项是从同一个 Datalab 实例和笔记本运行的)。也许没有意识到这一点(即内核重新启动)。
推荐阅读
- video - FFMPEG 视频合并。只有第一个视频显示
- pdf - PDF TJ 算子
- python - Scrapy 实际上在哪里执行 html 请求?
- reactjs - 在 React 上更新数组的状态
- typescript - 在 JestJS 中测试私有方法
- python - zip() 和 zip(*) 似乎给出了相似的结果
- elisp - (elisp dash.el) 帮助重写笨重的代码
- assembly - 在 CodeView 中单步执行时,EBX 的高位被清零
- opencv - 编译opencv3得到“libgdal.so.20 undefined reference”
- javascript - 检查元素/开发人员的工具有时不会产生结果