首页 > 解决方案 > 具有替代方法的重载方法值 createDirectStream

问题描述

我的 spark 版本是 1.6.2,我的 kafka 版本是 0.10.1.0。我想发送一个自定义对象作为 kafka 值类型,并尝试将此自定义对象推送到 kafka 主题中。并使用火花流来读取数据。我正在使用直接方法。以下是我的代码:

import com.xxxxx.kafka.{KafkaJsonDeserializer, KafkaObjectDecoder, pharmacyData}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object sparkReadKafka {
  val sparkConf = new SparkConf().setAppName("SparkReadKafka")
  val sc = new SparkContext(sparkConf)
  val ssc = new StreamingContext(sc, Seconds(1))

  def main(args: Array[String]): Unit = {
    val kafkaParams = Map[String, Object] (
      "bootstrap.servers" -> "kafka.kafka-cluster-shared.non-prod-5-az-scus.prod.us.xxxxx.net:9092",
      //"key.deserializer" -> classOf[StringDeserializer],
      //"value.deserializer" -> classOf[KafkaJsonDeserializer],
      "group.id" -> "consumer-group-2",
      "auto.offset.reset" -> "earliest",
      "auto.commit.interval.ms" -> "1000",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "session.timeout.ms" -> "30000"
    )

    val topic = "hw_insights"

    val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic))
  }
}

我得到的错误与此类似(出于安全目的,我必须删除某些部分):

错误:(29, 47) 使用替代方法重载方法值 createDirectStream: (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: Class[String],valueClass: Class[com.xxxxxxx.kafka.pharmacyData] ,keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[com.xxxxxxx.kafka.KafkaObjectDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Set[String])org .apache.spark.streaming.api.java.JavaPairInputDStream[String,com.xxxxxxx.kafka.pharmacyData] (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String] ,topics: scala.collection.immutable.Set[String])(隐含证据$19: scala.reflect.ClassTag[String], 隐含证据$20: scala.reflect.ClassTag[com.xxxxxxx.kafka.pharmacyData], 隐含证据$21 :scala.reflect.ClassTag[kafka.serializer.StringDecoder],隐含证据$22: scala.reflect.ClassTag[com.xxxxxxx.kafka.KafkaObjectDecoder])org.apache.spark.streaming.dstream.InputDStream[(String, com.xxxxxxx.kafka.pharmacyData)] 不能应用于 (org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,Object], scala.collection.immutable.Set[String]) val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic)) 下面是我的客户解码器类:immutable.Set[String]) val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic)) 下面是我的客户解码器类:immutable.Set[String]) val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic)) 下面是我的客户解码器类:

import kafka.serializer.Decoder
import org.codehaus.jackson.map.ObjectMapper

class KafkaObjectDecoder extends Decoder[pharmacyData] {
  override def fromBytes(bytes: Array[Byte]): pharmacyData = {
    val mapper = new ObjectMapper()
    val pdata = mapper.readValue(bytes, classOf[pharmacyData])
    pdata
  }
}

有人可以帮我解决问题吗?谢谢!

标签: scalaapache-sparkapache-kafkaspark-streamingspark-streaming-kafka

解决方案


错误是说您的参数不正确

不能应用于(org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,Object], scala.collection.immutable.Set[String])

它认为你想要的最接近的方法是

(jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: Class[String],valueClass: Class[com.xxxxxxx.kafka.pharmacyData],keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[com.xxxxxxx.kafka.KafkaObjectDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Set[String])


推荐阅读