首页 > 解决方案 > Google Dataflow streaming - how to stop long running steps

问题描述

I have a python Dataflow pipeline that gets notifications from pubsub, reads files from a storage bucket, transforms them and then uploads them to BigQuery.

I have to do backfills which greatly increases the volume through the pipeline and it scales up the amount of workers to handle this, as expected. Afterwards, when the volume is low enough for just one worker, it doesn't autoscale down. I see that I'm getting a lot of errors about long running steps like the below:

Error message from worker: Operation ongoing in step s03 for at least 04h30m00s without 
outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at 
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(
RegisterAndProcessBundleOperation.java:332) at 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) at 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350) at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152) at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at 
java.lang.Thread.run(Thread.java:748)

Is there a way to stop or abort these long tasks? Eg set a time limit a step must complete within?

I believe this is what's stopping my pipeline scaling down so it will make it expensive to leave the pipeline on all the time as I wish.

标签: pythonbigdatagoogle-cloud-dataflowapache-beam

解决方案


推荐阅读