首页 > 解决方案 > 为什么流式查询在 StreamingQueryManager.awaitAnyTermination 之后仍然启动并运行?

问题描述

我想在特定时间后终止火花映射。我正在sqlContext.streams.awaitAnyTermination(long timeoutMs)为此使用。但是在给定的超时之后映射并没有停止。

我尝试从 azure 事件中心读取数据并提供 2 分钟(120000 毫秒)作为 awaitAnyTermination 方法的超时时间。但映射并没有在 azure databricks 集群上停止。

下面是我的代码。我正在阅读 azure eventthub 并写入控制台和 120000 毫秒来 awaitAnyTermination。

import org.apache.spark.eventhubs._

// Event hub configurations
// Replace values below with yours
import org.apache.spark.eventhubs.ConnectionStringBuilder

val connStr = ConnectionStringBuilder()
      .setNamespaceName("iisqaeventhub")
      .setEventHubName("devsource")
      .setSasKeyName("RootManageSharedAccessKey")
      .setSasKey("saskey")
      .build

val customEventhubParameters = EventHubsConf(connStr).setMaxEventsPerTrigger(5).setStartingPosition(EventPosition.fromEndOfStream)

// reading from the Azure event hub
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

// write to console
val query = incomingStream.writeStream
      .outputMode("append")
      .format("console")
      .start()

// awaitAnyTermination for shutting down the query
sqlContext.streams.awaitAnyTermination(120000)

我期望映射应该在超时后结束。没有错误,但映射没有停止。

标签: scalaapache-sparkspark-structured-streaming

解决方案


tl;博士按设计工作。


官方文档

awaitAnyTermination(timeoutMs: Long): 布尔值

返回任何查询是否已终止(多个可能已终止)。

换句话说,timeoutMs除非有异常或stop.


推荐阅读