scala - 如何使用火花流从网站检索数据?
问题描述
我想使用 Spark 流从网站中检索流中的数据。我认为我必须使用自定义接收器。所以,我尝试了这个:
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingMain {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val spark = SparkSession.builder.config(conf).getOrCreate()
val lines = ssc.receiverStream(new UrlReceiver("http://stream.meetup.com/2/rsvps"))
println("lines value" + lines.print())
val words: DStream[String] = lines.flatMap(_.split(","))
}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import java.io.InputStreamReader
import java.io.BufferedReader
import java.net.{URL, URLConnection}
import org.apache.spark.internal.Logging
class UrlReceiver(urlStr: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
override def onStart() = {
new Thread("Url Receiver") {
override def run() = {
val urlConnection: URLConnection = new URL(urlStr).openConnection
urlConnection.setRequestProperty("User-Agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:37.0) Gecko/20100101 Firefox/37.0")
println(urlConnection)
val bufferedReader: BufferedReader = new BufferedReader(
new InputStreamReader(urlConnection.getInputStream)
)
var msg = bufferedReader.readLine
while (msg != null) {
if (!msg.isEmpty) {
store(msg)
println("msg" + msg)
}
msg = bufferedReader.readLine
}
bufferedReader.close()
}
}.start()
}
override def onStop() = {
}
}
我在没有数据的控制台上有以下结果:
-------------------------------------------
Time: 1602750320000 ms
-------------------------------------------
我想从网站上检索流式传输的数据。一旦网站上有新数据,我想跟踪它。流应用程序应始终运行(如果我或用户决定停止应用程序,它应该停止)
谢谢您的帮助。
解决方案
推荐阅读
- php - 返回嵌套数组键名而不是索引
- ios - UiCollectionView 间距问题 - 快速
- java - 403 响应代码 - 输入正确?
- javascript - 如何在函数内的 onclick 函数中访问它?
- gradle - 在 Intellij/Gradle 中创建包含源代码和测试的平面目录结构?
- python - 将年份和月份名称转换为 pandas 数据框的日期时间列
- python - 如何使用 odoo-xml-rpc 一次在 ODOO 中创建多个值?
- java - 图像在移动时有奇怪的行为?
- java - 从另一个包访问类
- python - 为什么 print("aA".count("")) 打印值 3?