scala - 具有替代方法的重载方法值 createDirectStream
问题描述
我的 spark 版本是 1.6.2,我的 kafka 版本是 0.10.1.0。我想发送一个自定义对象作为 kafka 值类型,并尝试将此自定义对象推送到 kafka 主题中。并使用火花流来读取数据。我正在使用直接方法。以下是我的代码:
import com.xxxxx.kafka.{KafkaJsonDeserializer, KafkaObjectDecoder, pharmacyData}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object sparkReadKafka {
val sparkConf = new SparkConf().setAppName("SparkReadKafka")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
def main(args: Array[String]): Unit = {
val kafkaParams = Map[String, Object] (
"bootstrap.servers" -> "kafka.kafka-cluster-shared.non-prod-5-az-scus.prod.us.xxxxx.net:9092",
//"key.deserializer" -> classOf[StringDeserializer],
//"value.deserializer" -> classOf[KafkaJsonDeserializer],
"group.id" -> "consumer-group-2",
"auto.offset.reset" -> "earliest",
"auto.commit.interval.ms" -> "1000",
"enable.auto.commit" -> (false: java.lang.Boolean),
"session.timeout.ms" -> "30000"
)
val topic = "hw_insights"
val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic))
}
}
我得到的错误与此类似(出于安全目的,我必须删除某些部分):
错误:(29, 47) 使用替代方法重载方法值 createDirectStream: (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: Class[String],valueClass: Class[com.xxxxxxx.kafka.pharmacyData] ,keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[com.xxxxxxx.kafka.KafkaObjectDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Set[String])org .apache.spark.streaming.api.java.JavaPairInputDStream[String,com.xxxxxxx.kafka.pharmacyData] (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String] ,topics: scala.collection.immutable.Set[String])(隐含证据$19: scala.reflect.ClassTag[String], 隐含证据$20: scala.reflect.ClassTag[com.xxxxxxx.kafka.pharmacyData], 隐含证据$21 :scala.reflect.ClassTag[kafka.serializer.StringDecoder],隐含证据$22: scala.reflect.ClassTag[com.xxxxxxx.kafka.KafkaObjectDecoder])org.apache.spark.streaming.dstream.InputDStream[(String, com.xxxxxxx.kafka.pharmacyData)] 不能应用于 (org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,Object], scala.collection.immutable.Set[String]) val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic)) 下面是我的客户解码器类:immutable.Set[String]) val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic)) 下面是我的客户解码器类:immutable.Set[String]) val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic)) 下面是我的客户解码器类:
import kafka.serializer.Decoder
import org.codehaus.jackson.map.ObjectMapper
class KafkaObjectDecoder extends Decoder[pharmacyData] {
override def fromBytes(bytes: Array[Byte]): pharmacyData = {
val mapper = new ObjectMapper()
val pdata = mapper.readValue(bytes, classOf[pharmacyData])
pdata
}
}
有人可以帮我解决问题吗?谢谢!
解决方案
错误是说您的参数不正确
不能应用于
(org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,Object], scala.collection.immutable.Set[String])
它认为你想要的最接近的方法是
(jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: Class[String],valueClass: Class[com.xxxxxxx.kafka.pharmacyData],keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[com.xxxxxxx.kafka.KafkaObjectDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Set[String])
推荐阅读
- content-management-system - 有没有办法在 Silverstripe 模型管理员中为布尔汇总字段添加自定义“响应”?
- html - 嵌套多个弹性方向组合的弹性网格问题
- karate - 如何在空手道中找到json数组名称有空格的json键值
- android - java.lang.NoClassDefFoundError:org.springframework.web.client.RestTemplate
- angular - 如何在 Angular 应用程序中解析节俭对象?
- android - Android 中不编译有味道的源代码
- javascript - 通过 jquery 或 PHP 识别唯一的设备
- spring-boot - Spring-boot JPA 多个数据源未更新或创建表
- opc-ua - 用户数据库示例中的 UaException
- python - python的自动代码检查工作不正确