首页 > 解决方案 > 如何对 Spark Structured Streaming 进行单元测试?

问题描述

我想了解 Spark Structured Streaming 的单元测试方面。我的场景是,我从 Kafka 获取数据,并使用 Spark Structured Streaming 使用它并在数据之上应用一些转换。

我不确定如何使用 Scala 和 Spark 进行测试。有人可以告诉我如何使用 Scala 在结构化流中进行单元测试。我是流媒体新手。

标签: scalaapache-sparkspark-structured-streamingspark-streaming-kafka

解决方案


tl;dr用于MemoryStream为输出添加事件和内存接收器。

以下代码应该有助于开始:

import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val sqlCtx = spark.sqlContext
import spark.implicits._
val events = MemoryStream[Event]
val sessions = events.toDS
assert(sessions.isStreaming, "sessions must be a streaming Dataset")

// use sessions event stream to apply required transformations
val transformedSessions = ...

val streamingQuery = transformedSessions
  .writeStream
  .format("memory")
  .queryName(queryName)
  .option("checkpointLocation", checkpointLocation)
  .outputMode(queryOutputMode)
  .start

// Add events to MemoryStream as if they came from Kafka
val batch = Seq(
  eventGen.generate(userId = 1, offset = 1.second),
  eventGen.generate(userId = 2, offset = 2.seconds))
val currentOffset = events.addData(batch)
streamingQuery.processAllAvailable()
events.commit(currentOffset.asInstanceOf[LongOffset])

// check the output
// The output is in queryName table
// The following code simply shows the result
spark
  .table(queryName)
  .show(truncate = false)

推荐阅读