apache-spark - 如何在 Hortonworks 中将 Spark 流数据存储到 Hdfs?
问题描述
我使用 Spark 从 Kafka 主题流式传输数据。这是我尝试过的代码。在这里,我只是在控制台中显示流数据。我想将此数据作为文本文件存储在 HDFS 中。
import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
object StreamingDataNew {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaConf = Map(
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "kafka-streaming-example",
"zookeeper.connection.timeout.ms" -> "200000"
)
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaConf,
Map("topic-one" -> 1), // subscripe to topic and partition 1
StorageLevel.MEMORY_ONLY
)
println("printing" + lines.toString())
val words = lines.flatMap { case (x, y) => y.split(" ") }
words.print()
ssc.start()
ssc.awaitTermination()
}
}
我发现我们可以使用'saveAsTextFiles'编写DStream。但是有人可以清楚地提到如何使用上述scala代码连接Hortonworks并存储在HDFS中的步骤。
解决方案
我找到了答案,这段代码对我有用。
package com.spark.streaming
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object MessageStreaming {
def main(args: Array[String]): Unit = {
println("Message streaming")
val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming")
val context = new SparkContext(conf)
val ssc = new StreamingContext(context, org.apache.spark.streaming.Seconds(10))
val kafkaParams = Map(
"bootstrap.servers" -> "kafka.kafka-cluster.com:9092",
"group.id" -> "kafka-streaming-example",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"zookeeper.connection.timeout.ms" -> "200000"
)
val topics = Array("cdc-classic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val content = stream.filter(x => x.value() != null)
val sqlContext = new org.apache.spark.sql.SQLContext(context)
import sqlContext.implicits._
stream.map(_.value).foreachRDD(rdd => {
rdd.foreach(println)
if (!rdd.isEmpty()) {
rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).json("hdfs://dev1a/user/hg5tv0/hadoop/MessagesFromKafka")
}
})
ssc.start()
ssc.awaitTermination
}}
推荐阅读
- django - 在搜索中获取上传的直接 URL - Wagtail
- laravel - Laravel 全局记录验证异常
- html - 浏览器在构建 CSS 网格时是否会出现明显的舍入错误?
- r - R:理解图
- javascript - 如何显示异步函数的结果
- node.js - 如果不在本地主机上,passport.js(github 策略)将停止传递用户以登录/成功
- javascript - 如何检查 webm icecast 流是否处于活动状态?
- javascript - 我怎样才能做到这一点?添加无效的输入选项
- git - Git: add file to old commit being parent of a new branch already merged back?
- react-native - 在选项卡开关上打开选项卡导航器内的堆栈导航器的初始屏幕,而不是上次访问的屏幕