scala - 如何对 Spark Structured Streaming 进行单元测试?
问题描述
我想了解 Spark Structured Streaming 的单元测试方面。我的场景是,我从 Kafka 获取数据,并使用 Spark Structured Streaming 使用它并在数据之上应用一些转换。
我不确定如何使用 Scala 和 Spark 进行测试。有人可以告诉我如何使用 Scala 在结构化流中进行单元测试。我是流媒体新手。
解决方案
tl;dr用于MemoryStream
为输出添加事件和内存接收器。
以下代码应该有助于开始:
import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val events = MemoryStream[Event]
val sessions = events.toDS
assert(sessions.isStreaming, "sessions must be a streaming Dataset")
// use sessions event stream to apply required transformations
val transformedSessions = ...
val streamingQuery = transformedSessions
.writeStream
.format("memory")
.queryName(queryName)
.option("checkpointLocation", checkpointLocation)
.outputMode(queryOutputMode)
.start
// Add events to MemoryStream as if they came from Kafka
val batch = Seq(
eventGen.generate(userId = 1, offset = 1.second),
eventGen.generate(userId = 2, offset = 2.seconds))
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])
// check the output
// The output is in queryName table
// The following code simply shows the result
spark
.table(queryName)
.show(truncate = false)
推荐阅读
- python - 使用多个 GPU 时如何保存模型?
- javascript - 按数据属性查找元素
- python - 字典值排序错误
- javascript - 平台浏览器已经设置好了。用 [object Object] 覆盖平台。在电子应用程序中使用 tfjs-node 时
- python - Django - 在 API 视图上设置 CSRF 令牌
- ruby-on-rails - Rails 中的嵌套属性,它们的键是列字段
- html - 在移动设备上嵌入超出页面宽度 - 在 Squarespace
- reactjs - React Native SVG - 设置 SVG 宽度和高度会导致图标被切断
- apache - $_GET 变量为空 .htaccess 重写
- swift - 在 tableView 中出列可重用的 Cell 崩溃