apache-spark - Kafka Avro 到 Elasticsearch 与 Spark
问题描述
想要将Avro
来自Kafka
主题的消息放入Elasticsearch
使用Spark
作业(以及具有许多已定义模式的 SchemaRegistry)。我能够成功地读取记录并将其反序列化为字符串(json
)格式(使用这两种方法):
// Deserialize Avro to String
def avroToJsonString(record: GenericRecord): String = try {
val baos = new ByteArrayOutputStream
try {
val schema = record.getSchema
val jsonEncoder = EncoderFactory.get.jsonEncoder(schema, baos, false)
val avroWriter = new SpecificDatumWriter[GenericRecord](schema)
avroWriter.write(record, jsonEncoder)
jsonEncoder.flush()
baos.flush()
new String(baos.toByteArray)
} catch {
case ex: IOException =>
throw new IllegalStateException(ex)
} finally if (baos != null) baos.close()
}
// Parse JSON String
val parseJsonStream = (inStream: String) => {
try {
val parsed = Json.parse(inStream)
Option(parsed)
} catch {
case e: Exception => System.err.println("Exception while parsing JSON: " + inStream)
e.printStackTrace()
None
}
}
我正在逐条读取记录,并且JSON
在调试器中看到反序列化的字符串,一切看起来都很好,但由于某种原因我无法将它们保存到Elasticsearch
中,因为我猜RDD
需要调用saveToEs
方法。这就是我从以下位置读取 avro 记录的方式Kafka
:
val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_TOPICS, kafkaParams))
val kafkaStreamParsed= kafkaStream.foreachRDD(rdd => {
rdd.foreach( x => {
val jsonString: String = avroToJsonString(x.value())
parseJsonStream(jsonString)
})
})
如果我在阅读 json(不是 Avro)记录时,我可以这样做:
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX + "/" + ELASTICSEARCH_TYPE)
我的方法有错误saveToEs
说
无法解析重载方法“saveToEs”
试着做rdd
,sc.makeRDD()
但也没有运气。我应该如何将批处理作业中的所有这些记录放入RDD
和之后,Elasticsearch
否则我做错了?
更新
尝试解决方案:
val messages: DStream[Unit] = kafkaStream
.map(record => record.value)
.flatMap(record => {
val record1 = avroToJsonString(record)
JSON.parseFull(record1).map(rawMap => {
val map = rawMap.asInstanceOf[Map[String,String]]
})
})
再次使用相同的Error
方法(无法解析重载方法)
更新2
val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
val eventJSON = avroToJsonString(rdd.value())
parseJsonStream(eventJSON)
})
try {
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX + "/" + ELASTICSEARCH_TYPE)
} catch {
case e: Exception =>
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
e.printStackTrace()
}
现在我得到了 ES 中的记录。
使用Spark 2.3.0
和Scala 2.11.8
解决方案
我设法做到了:
val kafkaStream : InputDStream[ConsumerRecord[String, GenericRecord]] = KafkaUtils.createDirectStream[String, GenericRecord](ssc, PreferBrokers, Subscribe[String, GenericRecord](KAFKA_AVRO_EVENT_TOPICS, kafkaParams))
val kafkaStreamParsed: DStream[Any] = kafkaStream.map(rdd => {
val eventJSON = avroToJsonString(rdd.value())
parseJsonStream(eventJSON)
})
try {
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_EVENTS_INDEX + "/" + ELASTICSEARCH_TYPE)
} catch {
case e: Exception =>
EsSparkStreaming.saveToEs(kafkaStreamParsed, ELASTICSEARCH_FAILED_EVENTS)
e.printStackTrace()
}
推荐阅读
- reactjs - 反应路由器链接不刷新页面
- javascript - isPalindrome() 函数在测试用例上失败
- function - PowerShell 错误:计算机名称在自定义函数中无效
- react-native - React Navigation 4:重置嵌套在 DrawerNavigator 中的 StackNavigator
- flutter - 如何设置团队内切换分数按钮的值?
- c# - 从动态集合创建地图不再有效:无法将 System.String 类型的对象转换为 System.Object)
- javascript - 特定文件中的导入函数返回 TypeError: X is not a function but it works in other files
- ios - iOS 中快速更新 RSSI 蓝牙
- excel - Excel - 使用动态日期字段为上一个日期范围(上个月、上一年等)创建汇总统计信息
- python - 作为 attr.s 列表的 attr.ib 的序列化(使用 yasoo)失败