spark-streaming - 为什么火花流查询失败 java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
问题描述
我每 5 秒有一个从 Azure Eventhubs 到 ADLS 的流式查询流式传输数据,并且相同的流式查询是 1 小时窗口的水印,水印延迟 5 分钟。
代码:
val rawStreamQuery = messages.writeStream.format("delta")
.option("checkpointLocation", BASE_LOC + "checkpoint/" + RAW_SCHEMA_NAME + "/" + RAW_TASK_TABLE)
.trigger(Trigger.ProcessingTime(RAW_STREAM_TRIGGER_INTERVAL))
.table(RAW_SCHEMA_NAME + "." + RAW_TASK_TABLE)
rawStreamQuery.withWatermark(watermarkTimeStamp, STREAM_WATERMARK) //5 minutes
.groupBy(window(col(watermarkTimeStamp), STREAM_WINDOW).as("window")) //1 hour
.count()
.select(
lit(commonDataObj.getFeedName).as("feed_name")
, lit(commonDataObj.getStage).as("stage_name")
, col("count").as("record_count")
, col("window").getField("start").as("start_ts")
, col("window").getField("end").as("end_ts")
)
得到以下错误。
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.createReceiver(CachedEventHubsReceiver.scala:99)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.recreateReceiver(CachedEventHubsReceiver.scala:151)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:169)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:231)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:356)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:123)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:640)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
解决方案
推荐阅读
- python - CNN中的Tensorflow可变动态大小
- ios - UINavigationBar 中用户可编辑的大标题
- javascript - NaN==NaN 如何返回 false?
- javascript - 根据对象值创建子数组(两个对象的两个分组)
- php - 需要一个食谱表和类别表,以便一个食谱可以有多个类别
- c# - WPF 在 Windows 8.1 和 Windows 7 上使用白屏和 1 GB 非托管 RAM
- php - php ZipArchieve 错误码9的解决方案
- c++ - 表格倒序
- c - Execlp 函数找不到二进制文件
- python - Numpy 和 PyTorch 的相同随机种子