首页 > 技术文章 > Kafka- Spark消费Kafka

RzCong 2018-03-23 14:14 原文

 

 

 

在高版本的API中

val brokers = properties.getProperty("kafka.host.list")
val topics = Set(properties.getProperty("kafka.application.topic"))
val kafkaParams = Map[String, String](
  "bootstrap.servers"           -> brokers,
  "group.id"                    -> "ntaflowgroup",
  "auto.commit.interval.ms"     -> "1000",
  "key.deserializer"            -> "org.apache.kafka.common.serialization.StringDeserializer",
  "value.deserializer"          -> "org.apache.kafka.common.serialization.StringDeserializer",
  "auto.offset.reset"           -> "latest",
  "enable.auto.commit"          -> "true"
)
val ntaflowCache: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )

 

推荐阅读