首页 > 解决方案 > 如何使用火花流从网站检索数据?

问题描述

我想使用 Spark 流从网站中检索流中的数据。我认为我必须使用自定义接收器。所以,我尝试了这个:

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingMain {

 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
 val ssc = new StreamingContext(conf, Seconds(1))
 val spark = SparkSession.builder.config(conf).getOrCreate()
 val lines = ssc.receiverStream(new UrlReceiver("http://stream.meetup.com/2/rsvps"))
 
 println("lines value" +  lines.print())
 
 val words: DStream[String] = lines.flatMap(_.split(","))
}


import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import java.io.InputStreamReader
import java.io.BufferedReader

import java.net.{URL, URLConnection}

import org.apache.spark.internal.Logging

 class UrlReceiver(urlStr: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2)   with Logging {

override def onStart() = {
    new Thread("Url Receiver") {
        override def run() = {
            val urlConnection: URLConnection = new URL(urlStr).openConnection
            urlConnection.setRequestProperty("User-Agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:37.0) Gecko/20100101 Firefox/37.0")
            println(urlConnection)
            val bufferedReader: BufferedReader = new BufferedReader(
                new InputStreamReader(urlConnection.getInputStream)
            )
            var msg = bufferedReader.readLine
            while (msg != null) {
                if (!msg.isEmpty) {
                    store(msg)
                    println("msg" + msg)
                }
                msg = bufferedReader.readLine
            }
            bufferedReader.close()
        }
    }.start()
}

override def onStop() = {

   }
}

我在没有数据的控制台上有以下结果:

-------------------------------------------
Time: 1602750320000 ms
-------------------------------------------

我想从网站上检索流式传输的数据。一旦网站上有新数据,我想跟踪它。流应用程序应始终运行(如果我或用户决定停止应用程序,它应该停止)

谢谢您的帮助。

标签: scalaapache-sparkspark-streaming

解决方案


推荐阅读