apache-spark - 为什么在流源中没有获得任何新偏移量的情况下触发了新批次?
问题描述
我有多个 spark 结构化流式作业,我看到的通常行为是,只有当 Kafka 中有任何新的偏移量用作创建流式查询的源时,才会触发一个新的批处理。
但是,当我运行这个使用 演示任意有状态操作的示例时mapGroupsWithState
,我看到即使 Streaming 源中没有新数据也会触发一个新批处理。为什么会这样,可以避免吗?
Update-1 我修改了上面的示例代码并删除了与状态相关的操作,例如更新/删除它。函数简单地输出零。但是仍然每 10 秒触发一次批处理,netcat 服务器上没有任何新数据。
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming._
object Stateful {
def main(args: Array[String]): Unit = {
val host = "localhost"
val port = "9999"
val spark = SparkSession
.builder
.appName("StructuredSessionization")
.master("local[2]")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port
val lines = spark.readStream
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load()
// Split the lines into words, treat words as sessionId of events
val events = lines
.as[(String, Timestamp)]
.flatMap { case (line, timestamp) =>
line.split(" ").map(word => Event(sessionId = word, timestamp))
}
val sessionUpdates = events
.groupByKey(event => event.sessionId)
.mapGroupsWithState[SessionInfo, Int](GroupStateTimeout.ProcessingTimeTimeout) {
case (sessionId: String, events: Iterator[Event], state: GroupState[SessionInfo]) =>
0
}
val query = sessionUpdates
.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
query.awaitTermination()
}
}
case class Event(sessionId: String, timestamp: Timestamp)
case class SessionInfo(
numEvents: Int,
startTimestampMs: Long,
endTimestampMs: Long)
解决方案
出现空批次的原因是在 mapGroupsWithState 调用中使用了超时。
根据“Learning Spark 2.0”一书,它说:
“即使该 micro.batch 中没有该键的数据,下一个微批处理也会调用此超时键上的函数。[...] 由于超时是在微批处理期间处理的,因此时间它们的执行不精确,很大程度上取决于触发间隔 [...]。”
由于您已将超时设置为GroupStateTimeout.ProcessingTimeTimeout
与查询的触发时间一致,即 10 秒。另一种方法是根据事件时间(即GroupStateTimeout.EventTimeTimeout
)设置超时。
GroupState 上的ScalaDocs提供了更多详细信息:
当某个组发生超时时,将对该组调用没有值的函数,并将 GroupState.hasTimedOut() 设置为 true。
推荐阅读
- css - webpack:在 css-loader 中包含 node_modules 包,但在 css-modules 中排除
- java - 为什么java代码中的String.length不计算空间?
- qt - 双击后如何打开新窗口使其变为活动状态?
- php - 在yii2中单击按钮时如何更新数据库中的字段?
- python - 在python中一起使用两个for循环将二维列表转换为一维列表
- apache - 使用 solr DIH 时数据不匹配
- json.net - 如何使 JObject 和 JToken 全局不区分大小写
- c# - 在 MVC 中如何识别来自请求的结果/数据是缓存数据还是新鲜数据?
- javascript - 如何在 D3 图表中添加一条线
- php - php按钮onclick执行mysq - 没有显示?