python - Google Dataflow streaming - how to stop long running steps
问题描述
I have a python Dataflow pipeline that gets notifications from pubsub, reads files from a storage bucket, transforms them and then uploads them to BigQuery.
I have to do backfills which greatly increases the volume through the pipeline and it scales up the amount of workers to handle this, as expected. Afterwards, when the volume is low enough for just one worker, it doesn't autoscale down. I see that I'm getting a lot of errors about long running steps like the below:
Error message from worker: Operation ongoing in step s03 for at least 04h30m00s without
outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(
RegisterAndProcessBundleOperation.java:332) at
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) at
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350) at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152) at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
java.lang.Thread.run(Thread.java:748)
Is there a way to stop or abort these long tasks? Eg set a time limit a step must complete within?
I believe this is what's stopping my pipeline scaling down so it will make it expensive to leave the pipeline on all the time as I wish.
解决方案
推荐阅读
- ios - 制作一个 tableView 显示以前的用户输入(在其他视图中)
- three.js - 在three.js中渲染具有部分透明纹理的平面阴影
- jquery - jquery检测已被触摸的子菜单项的id
- rpmbuild - 如何在 rpm 规范文件中检查 RedHat (RHEL) 次要版本?
- sql - 关于构建巨大对象的巨大查询的问题?
- javascript - 使用 Javascript 列出文件路径中的特定目录
- html - 不同文件中相同的 CSS 使页面加载不同
- python-3.x - 如何从范围内生成列表中的随机数
- android-layout - 如何制作带有矩形图像的 ImageButton 圆形?
- c++ - 为什么win api线程执行函数,而标准线程不执行?