首页 > 解决方案 > 使用 Spark Streaming API 测试 Twitter

问题描述

我是 Spark 的 Streaming 框架的新手,正在尝试处理 twitter 流。我正在编写相同的测试用例,并且了解我可以使用 Spark StreamingSuiteBase,这将帮助我将输入作为流在我的函数中进行测试。但是我编写了一个函数,它将 DStream[Status] 作为输入,处理后将 DStream[String] 作为输出。我在 StreamingSuiteBase 中使用的 api 是 testOperation。

test("Filter only words Starting with #")  {
  val inputTweet = List(List("this is #firstHash"), List("this is #secondHash"), List("this is #thirdHash"))
  val expected = List(List("#firstHash"), List("#secondHash"), List("#thirdHash"))

  testOperation(inputTweet, TransformTweets.getText _, expected, ordered = false)

这是发送输入的函数..

 def getText(englishTweets: DStream[Status]): DStream[String] = {
    println(englishTweets.toString)

    val hashTags = englishTweets.flatMap(x => x.getText.split(" ").filter(_.startsWith("#")))

    hashTags
  }

但是由于 DStream[Status] 和 DStream[String],我收到错误“类型不匹配”。如何模拟 Stream[Status]。

标签: scalaapache-sparktwitterspark-streaming

解决方案


createStatus因此,我通过从“ ”API获取 Twitter 状态解决了这个问题TwitterObjectFactory。没有必要嘲笑TwitterStatus。即使您设法模拟它,也存在序列化问题。所以,这是最好的解决方案:

val rawJson = Source.fromURL(getClass.getResource("/tweetStatus.json")).getLines.mkString
val tweetStatus = TwitterObjectFactory.createStatus(rawJson)

希望这对某人有帮助!


推荐阅读