首页 > 解决方案 > 如何正确使用 Dataflow / Apache beam wait_until_finish 持续时间参数?

问题描述

我有一个批处理作业在数据流运行器上的 apache-beam[gcp]==2.19.0 版本下的 gcp 数据流中运行。我为该工作创建了一个自定义模板。该作业按预期运行,但我还想添加一个最大作业持续时间。我在 wait_until_finish() 方法中找到了持续时间(以毫秒为单位)参数,该参数应该可用。问题是:如何让模板化的批处理作业在运行时间超过持续时间时自动停止?我不需要保留任何数据,我只希望作业在运行时间过长时停止。我已经实现了运行功能如下:

def run():
    opts = PipelineOptions()
    user_options = opts.view_as(UserOptions)
    p = beam.Pipeline(options=opts)

    (p |
     "Read data" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query,
                                                        use_standard_sql=StaticValueProvider(bool, True))) |
     "Get data" >> beam.ParDo(doStuff()) |

     "Output data" >> beam.ParDo(outputData(param1=user_options.input1)) |

     "Write to BQ" >> beam.io.WriteToBigQuery(
                table=user_options.table_spec,
                schema=user_options.table_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
     )

    result = p.run()

    result.wait_until_finish(duration=1800000)

标签: pythongoogle-cloud-dataflowapache-beamdataflow

解决方案


不,Dataflow 不会在特定时间段后提供自动取消功能。您仍然可以通过简单地放置 cancel() 来实现您的目标

    result.wait_until_finish(duration=1800000)
    if not result.is_in_terminal_state():   # if pipeline isn't finished, cancel
      result.cancel()

推荐阅读