首页 > 解决方案 > Kafka Avro 到 Elasticsearch 与 Spark

问题描述

想要将Avro来自Kafka主题的消息放入Elasticsearch使用Spark作业(以及具有许多已定义模式的 SchemaRegistry)。我能够成功地读取记录并将其反序列化为字符串(json)格式(使用这两种方法):

   // Deserialize Avro to String
  def avroToJsonString(record: GenericRecord): String = try {
    val baos = new ByteArrayOutputStream
    try {
      val schema = record.getSchema
      val jsonEncoder = EncoderFactory.get.jsonEncoder(schema, baos, false)
      val avroWriter = new SpecificDatumWriter[GenericRecord](schema)
      avroWriter.write(record, jsonEncoder)
      jsonEncoder.flush()
      baos.flush()
      new String(baos.toByteArray)
    } catch {
      case ex: IOException =>
        throw new IllegalStateException(ex)
    } finally if (baos != null) baos.close()
  }

  // Parse JSON String
  val parseJsonStream = (inStream: String) => {
      try {
        val parsed = Json.parse(inStream)
        Option(parsed)
      } catch {
        case e: Exception => System.err.println("Exception while parsing JSON: " + inStream)
          e.printStackTrace()
          None
      }
    }

我正在逐条读取记录,并且JSON在调试器中看到反序列化的字符串,一切看起来都很好,但由于某种原因我无法将它们保存到Elasticsearch中,因为我猜RDD需要调用saveToEs方法。这就是我从以下位置读取 avro 记录的方式Kafka

val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_TOPICS, kafkaParams))

      val kafkaStreamParsed= kafkaStream.foreachRDD(rdd => {
        rdd.foreach( x => {
          val jsonString: String = avroToJsonString(x.value()) 
          parseJsonStream(jsonString) 
          })
        })

如果我在阅读 json(不是 Avro)记录时,我可以这样做:

EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX +  "/" + ELASTICSEARCH_TYPE)

我的方法有错误saveToEs

无法解析重载方法“saveToEs”

试着做rddsc.makeRDD()但也没有运气。我应该如何将批处理作业中的所有这些记录放入RDD和之后,Elasticsearch否则我做错了?

更新

尝试解决方案:

val messages: DStream[Unit] = kafkaStream
        .map(record => record.value)
        .flatMap(record => {
          val record1 = avroToJsonString(record)
          JSON.parseFull(record1).map(rawMap => {
            val map = rawMap.asInstanceOf[Map[String,String]]
          })
        })

再次使用相同的Error方法(无法解析重载方法)

更新2

val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
        val eventJSON = avroToJsonString(rdd.value())
        parseJsonStream(eventJSON)
      })

      try {
        EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX +  "/" + ELASTICSEARCH_TYPE)
      } catch {
        case e: Exception =>
          EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
          e.printStackTrace()
      }

现在我得到了 ES 中的记录。

使用Spark 2.3.0Scala 2.11.8

标签: apache-sparkapache-kafkaavro

解决方案


我设法做到了:

val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_EVENT_TOPICS, kafkaParams))

      val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
        val eventJSON = avroToJsonString(rdd.value())
        parseJsonStream(eventJSON)
      })

      try {
        EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX +  "/" + ELASTICSEARCH_TYPE)
      } catch {
        case e: Exception =>
          EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
          e.printStackTrace()
      }

推荐阅读