首页 > 解决方案 > 处理卡在步骤中 将突变写入 Cloud Spanner 到 Spanner 而不在状态过程中输出或完成

问题描述

处理卡在步骤 Sink-Spanner/将突变写入 Cloud Spanner/将突变写入 Spanner 至少 05m00s 没有在状态过程中输出或完成

正在写入 Spanner,但抛出此错误

PCollectionTuple spanMutTuple = pColIfEnrichMsgs.apply("CreateMutation",
                ParDo.of(new SpannerMutation(options, ttSpanMutOutMsgs, erroMessage))
                     .withOutputTags(ttSpanMutOutMsgs, TupleTagList.of(erroMessage))
);

/*...*/
pColSpanMut.apply("Sink-Spanner", 
    SpannerIO.write()
             .withInstanceId(options.getOutputSpannerInstanceId())
             .withDatabaseId(options.getOutputSpannerDatabaseId())
             .withMaxNumMutations(options.getOutputSpannerMaxMutations().get())
             .withBatchSizeBytes(options.getOutputSpannerBatchSizeBytes().get() * 1048576)
             .withFailureMode(FailureMode.REPORT_FAILURES)
             .withProjectId(options.getOutputSpannerProjectId().get())
);

预期:没有像谷歌云数据流 UI 中报告的警告/错误。

Processing stuck in step Sink-Spanner/Write mutations to Cloud Spanner/Write mutations to Spanner for at least 05m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
  at com.google.api.core.AbstractApiFuture.get(AbstractApiFuture.java:56)
  at com.google.cloud.spanner.spi.v1.GapicSpannerRpc.get(GapicSpannerRpc.java:556)
  at com.google.cloud.spanner.spi.v1.GapicSpannerRpc.commit(GapicSpannerRpc.java:528)
  at com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:822)
  at com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:819)
  at com.google.cloud.spanner.SpannerImpl.runWithRetries(SpannerImpl.java:251)
  at com.google.cloud.spanner.SpannerImpl$SessionImpl.writeAtLeastOnce(SpannerImpl.java:818)
  at com.google.cloud.spanner.SessionPool$PooledSession.writeAtLeastOnce(SessionPool.java:329)
  at com.google.cloud.spanner.DatabaseClientImpl.writeAtLeastOnce(DatabaseClientImpl.java:59)
  at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteToSpannerFn.processElement(SpannerIO.java:1243)
  at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteToSpannerFn$DoFnInvoker.invokeProcessElement(Unknown Source)

标签: google-cloud-dataflowdataflowgoogle-cloud-spanner

解决方案


感谢您的报告,当然它应该在处理错误消息方面做得更好。我将跟进一张内部票。与此同时,你解封了吗?如果是这样,是否可能是由于 withBatchSizeBytes() 和/或 withMaxNumMutations() 中的值太高?


推荐阅读