首页 > 解决方案 > Kafka 到 Spark 流使用 Pyspark 错误

问题描述

我正在尝试从 kafka 主题中获取数据,但我无法做到这一点。我已经尝试了共享链接的教程,但最后我得到了错误。我也添加了所有必需的 jar 文件(位置:-​​usr/local/spark/jars)。请让我知道可能出了什么问题。另外我想知道如何用 scala 编程来做到这一点。

https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#deploying-applications

https://medium.com/@kass09/spark-streaming-kafka-in-python-a-test-on-local-machine-edd47814746 在此处输入图像描述

尝试这个火花流命令我得到了错误。

“ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 示例/src/main/python/streaming/direct_kafka_wordcount.py”

我遇到了一个 jupyter 错误,所以我尝试使用以下命令来解决它,但错误仍然相同“pip3 install --upgrade --force-reinstall --no-cache-dir jupyter”

标签: apache-sparkpysparkapache-kafkajupyter-notebook

解决方案


Scala 中的 Spark 和 Kafka 集成

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent


object sparkStreaming_Kafka {

  @transient lazy val log = org.apache.log4j.LogManager.getLogger("sparkStreaming_Kafka")

  def main(args: Array[String]): Unit = {

    log.debug("added the messages ****** ---->>>")

    val spark = SparkSession
      .builder()
      .appName("my_App" )
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(2)) // the polling frequency is 2 seconds, can be modified based on the BM requirements.
    log.debug("Before starting the Stream -->>")
    val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String]
      (Array.apply("my_topic"), getKafkaParams())).map(record => record.value)

    stream.foreachRDD { rdd =>
      try {
        if (!rdd.isEmpty()) {
          rdd.foreach(x => postData(x))
        }
      } catch {
        case t: Throwable =>
          t.printStackTrace() // TODO: handle error
          log.error("Exception occured while processing the data exception is {}", t.getCause)
      }
    }

    ssc.start()
    log.debug("started now-->> " + compat.Platform.currentTime)
    ssc.awaitTermination()
  }

  def getKafkaParams(): Map[String, Object] = {
    Map[String, Object]("bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group_id",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)) 
  }


  def postData(event: String): Unit = {
    log.info("before KinesisSink.process call ==>>"+event)

    print(event)  // use the event as per the requirement

  }
}

推荐阅读