scala - 使用 Spark Streaming API 测试 Twitter
问题描述
我是 Spark 的 Streaming 框架的新手,正在尝试处理 twitter 流。我正在编写相同的测试用例,并且了解我可以使用 Spark StreamingSuiteBase,这将帮助我将输入作为流在我的函数中进行测试。但是我编写了一个函数,它将 DStream[Status] 作为输入,处理后将 DStream[String] 作为输出。我在 StreamingSuiteBase 中使用的 api 是 testOperation。
test("Filter only words Starting with #") {
val inputTweet = List(List("this is #firstHash"), List("this is #secondHash"), List("this is #thirdHash"))
val expected = List(List("#firstHash"), List("#secondHash"), List("#thirdHash"))
testOperation(inputTweet, TransformTweets.getText _, expected, ordered = false)
这是发送输入的函数..
def getText(englishTweets: DStream[Status]): DStream[String] = {
println(englishTweets.toString)
val hashTags = englishTweets.flatMap(x => x.getText.split(" ").filter(_.startsWith("#")))
hashTags
}
但是由于 DStream[Status] 和 DStream[String],我收到错误“类型不匹配”。如何模拟 Stream[Status]。
解决方案
createStatus
因此,我通过从“ ”API获取 Twitter 状态解决了这个问题TwitterObjectFactory
。没有必要嘲笑TwitterStatus
。即使您设法模拟它,也存在序列化问题。所以,这是最好的解决方案:
val rawJson = Source.fromURL(getClass.getResource("/tweetStatus.json")).getLines.mkString
val tweetStatus = TwitterObjectFactory.createStatus(rawJson)
希望这对某人有帮助!
推荐阅读
- python - Python:从列表中动态生成属性
- python - 获取白色像素的坐标以形成曲线
- javascript - 在对象数组中添加/删除设置/存储在状态中的列时,功能组件数据表不重新呈现
- java - 在 Spring Boot 中在 REST API 上配置 Websocket
- gradle - Gradle 创建包含子项目源的 javadoc
- hololens - 在 HoloLens 上禁用后处理相机/图像转换?
- docker - Golang os.Getenv(key) 返回整个 env 文件,而不仅仅是键的值
- python-3.x - 全局变量(尽管已定义)错误未在 python 中定义
- microsoft-edge - WebView2 运行时的离线安装程序/固定版本安装程序
- android - 更改移动语言时损坏的 XML 布局