首页 > 技术文章 > 实操2:使用sparkStreaming处理kafka的消息队列

zyp0519 2021-12-16 17:19 原文

  实操1中,已经实现了将数据传进kafka中,这里我们用sparkStreaming对数据进行处理

举个例子 原来数据:

       A,甲 乙 丙 (这里为空) 丁
处理过后:
A,甲
A,乙
A,丙
A,丁
将一行数据"炸开",并将空值处理掉

import java.util

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}

class Params() extends Serializable {
  var IP: String = ""
  var KEY_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
  var VALUE_IN_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"
  var ACKS = "all"
  var RETIRES = "3"
  var GROUPID = ""
  var AUTO_OFFSET = "earliest"
  var MAX_POLL = "500"
  var KEY_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"
  var VALUE_OUT_SERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"


  def getWriterKafkaParam() = {
    val hm = new util.HashMap[String, Object]()
    hm.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IP)
    hm.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_IN_SERIALIZER)
    hm.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_IN_SERIALIZER)
    hm.put(ProducerConfig.ACKS_CONFIG, ACKS)
    hm.put(ProducerConfig.RETRIES_CONFIG, RETIRES)
    hm
//    val hm = Map[String, Object](
//      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> IP,
//      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> KEY_IN_SERIALIZER,
//      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> VALUE_IN_SERIALIZER,
//      ProducerConfig.ACKS_CONFIG -> ACKS,
//      ProducerConfig.RETRIES_CONFIG -> RETIRES
//    )
//    hm
  }

  def getReadKafkaParam() = {
    val hm = new util.HashMap[String, Object]()
    hm.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, IP)
    hm.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID)
    hm.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET)
    hm.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL)
    hm.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_OUT_SERIALIZER)
    hm.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_OUT_SERIALIZER)
    hm
  }

}

//object Params {
//  def apply(): Params = new Params()
//}


推荐阅读