首页 > 技术文章 > streaming直连kafka消费

shuaidong 2021-10-12 09:44 原文

pom文件:

kafka2.2.1和cdh6.3.0+spark2.4.0+版本

 

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0-kafka-2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1-cdh6.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0-cdh6.3.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0-cdh6.3.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.6</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.9</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
</dependency>
</dependencies>

正文:

class KafkaChannal() {


def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext,
topics: Set[String],
kafkaParams: Map[String, Object],
autoUpdateoffset: Boolean,
fromBeginning: Boolean): InputDStream[ConsumerRecord[K, V]] = {
//fromBeginning 为true时从头消费 为false时从上次未消费的地方消费
//autoUpdateoffset 为false时取消自动提交偏移量
val props = new mutable.HashMap[String, Object]()
props ++= kafkaParams
var groupId = kafkaParams.get("group.id").getOrElse("default")
props.put("group.id", groupId)
props.put("enable.auto.commit", autoUpdateoffset.asInstanceOf[Object])
var consumerStrategy: ConsumerStrategy[K, V] = null
if (fromBeginning) {
//获取到消费者
val consumer = new KafkaConsumer[String, String](kafkaParams.asJava)
val partitions = topics.flatMap(topic => {
consumer.partitionsFor(topic).asScala.map(info => {
new TopicPartition(topic, info.partition())
})
})
val offsets = consumer.beginningOffsets(JavaConversions.setAsJavaSet(partitions))
val kafkatopics = new util.ArrayList[String](topics.asJava)
val kafkaParamsMap = new util.HashMap[String, Object](kafkaParams.asJava)
consumerStrategy = ConsumerStrategies.Subscribe[K, V](kafkatopics, kafkaParamsMap, offsets)
} else {
consumerStrategy = ConsumerStrategies.Subscribe[K, V](topics, props.toMap)
}
val messages = KafkaUtils.createDirectStream[K, V](ssc,
LocationStrategies.PreferConsistent,
consumerStrategy
)
messages
}

}

object KafkaChannal {

def main(args: Array[String]): Unit = {
val ssc = createStreamingContext(5)

val topics = "dTopic"
val topicSet: Set[String] = topics.split(",").toSet
val kafkaParams: Map[String, String] = KafkaConfig.buildKafkaParams
val autoUpdateoffset: Boolean = false
//返回值类型 String String 写好 避免踩坑
val message: InputDStream[ConsumerRecord[String, String]] = new KafkaChannal().createDirectStream(ssc, topicSet, kafkaParams, autoUpdateoffset, false)
val dStream: DStream[String] = message.map(_.value())
dStream.foreachRDD(rdd => {
rdd.foreach(data => {
val data1 = data
println(data1)
})
})
ssc.checkpoint(s"C://tmp//dd")

//数据处理结束后提交偏移量
message.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
message.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})

ssc.start
ssc.awaitTermination()
}

protected def createStreamingContext(batchDuration: Long): StreamingContext = {
val sparkConf = new SparkConf().setAppName("kafkaChannal")
sparkConf.set("es.nodes.discovery", "true")
sparkConf.set("es.batch.size.bytes", "300000")
sparkConf.set("es.batch.size.entries", "10000")
sparkConf.set("es.batch.write.refresh", "false")
sparkConf.set("es.batch.write.retry.count", "50")
sparkConf.set("es.batch.write.retry.wait", "500")
sparkConf.set("es.http.timeout", "5m")
sparkConf.set("es.http.retries", "50")
sparkConf.set("es.http.enabled", "true")
sparkConf.set("es.action.heart.beat.lead", "50")
sparkConf.set("spark.streaming.receiver.maxRate", "20000")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5000")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(batchDuration))
ssc
}


}
KafkaConfig类(params):
object KafkaConfig {

def buildKafkaParams: Map[String, String] = {
val kafkaParams = Map[String, String](
"auto.offset.reset" -> "latest",
"group.id" -> "group组",
"refresh.leader.backoff.ms.id" -> "2000",
"num.consumer.fetchers" -> "1",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"bootstrap.servers" -> "ip:9092,ip:9092,ip:9092"
)

kafkaParams
}

}
 
 

 

推荐阅读