apache-spark - Kafka 到 Spark 流使用 Pyspark 错误
问题描述
我正在尝试从 kafka 主题中获取数据,但我无法做到这一点。我已经尝试了共享链接的教程,但最后我得到了错误。我也添加了所有必需的 jar 文件(位置:-usr/local/spark/jars)。请让我知道可能出了什么问题。另外我想知道如何用 scala 编程来做到这一点。
https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#deploying-applications
https://medium.com/@kass09/spark-streaming-kafka-in-python-a-test-on-local-machine-edd47814746
尝试这个火花流命令我得到了错误。
“ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 示例/src/main/python/streaming/direct_kafka_wordcount.py”
我遇到了一个 jupyter 错误,所以我尝试使用以下命令来解决它,但错误仍然相同“pip3 install --upgrade --force-reinstall --no-cache-dir jupyter”
解决方案
Scala 中的 Spark 和 Kafka 集成
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object sparkStreaming_Kafka {
@transient lazy val log = org.apache.log4j.LogManager.getLogger("sparkStreaming_Kafka")
def main(args: Array[String]): Unit = {
log.debug("added the messages ****** ---->>>")
val spark = SparkSession
.builder()
.appName("my_App" )
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(2)) // the polling frequency is 2 seconds, can be modified based on the BM requirements.
log.debug("Before starting the Stream -->>")
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String]
(Array.apply("my_topic"), getKafkaParams())).map(record => record.value)
stream.foreachRDD { rdd =>
try {
if (!rdd.isEmpty()) {
rdd.foreach(x => postData(x))
}
} catch {
case t: Throwable =>
t.printStackTrace() // TODO: handle error
log.error("Exception occured while processing the data exception is {}", t.getCause)
}
}
ssc.start()
log.debug("started now-->> " + compat.Platform.currentTime)
ssc.awaitTermination()
}
def getKafkaParams(): Map[String, Object] = {
Map[String, Object]("bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean))
}
def postData(event: String): Unit = {
log.info("before KinesisSink.process call ==>>"+event)
print(event) // use the event as per the requirement
}
}
推荐阅读
- azure - Kubernetes 中的 Azure DevOps 构建代理
- javascript - Joi 嵌套时
- kubernetes - 为每个 statefulset pod 使用特定节点的正确方法是什么?
- node.js - 如何在 Angular 中使用 NodeJS/express/multer 访问存储为 mongodb 中的缓冲区的图像
- azure - 使用 ARM-TTK 测试 ARM 模板
- javascript - DataTable 更新到 1.11 销毁返回错误
- pandas - Groupby agg保持空白值
- github - 该应用程序需要以下 .net 框架版本之一。如何为github解决这个问题?
- java - 如何使用 io.reactivex.rxjava3.core
- php - Wordpress如何在转义html标签时使用换行符