python - 如何正确使用 Dataflow / Apache beam wait_until_finish 持续时间参数?
问题描述
我有一个批处理作业在数据流运行器上的 apache-beam[gcp]==2.19.0 版本下的 gcp 数据流中运行。我为该工作创建了一个自定义模板。该作业按预期运行,但我还想添加一个最大作业持续时间。我在 wait_until_finish() 方法中找到了持续时间(以毫秒为单位)参数,该参数应该可用。问题是:如何让模板化的批处理作业在运行时间超过持续时间时自动停止?我不需要保留任何数据,我只希望作业在运行时间过长时停止。我已经实现了运行功能如下:
def run():
opts = PipelineOptions()
user_options = opts.view_as(UserOptions)
p = beam.Pipeline(options=opts)
(p |
"Read data" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query,
use_standard_sql=StaticValueProvider(bool, True))) |
"Get data" >> beam.ParDo(doStuff()) |
"Output data" >> beam.ParDo(outputData(param1=user_options.input1)) |
"Write to BQ" >> beam.io.WriteToBigQuery(
table=user_options.table_spec,
schema=user_options.table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
result = p.run()
result.wait_until_finish(duration=1800000)
解决方案
不,Dataflow 不会在特定时间段后提供自动取消功能。您仍然可以通过简单地放置 cancel() 来实现您的目标
result.wait_until_finish(duration=1800000)
if not result.is_in_terminal_state(): # if pipeline isn't finished, cancel
result.cancel()
推荐阅读
- google-cloud-platform - 谷歌云 | 将 Cloud Composer 连接到 Cloud SQL
- python - 你如何在 cmd 中检查你的 django 版本
- rust - 在 Rust 中使用 Serde 从 json 中提取可能是可选的嵌套值
- dropzone.js - DropZone.js - 以编程方式实例化时,不会创建 html 元素
- java - Java 简化绘制对象路径
- angular - 创建分组属性
- google-chrome - 当 Chrome 用户使用 NPAPI 插件时会发生什么?
- javascript - 平滑滚动不会使其锚定
- python - K最近邻
- javascript - JavaScript 动画延迟