json - 如何将带有 json 数组的 Datastream 分解为单个数组元素的 DataStream
问题描述
我有一个 Datastream[ObjectNode],我从 kafka 主题中读取为反序列化的 json。这个 ObjectNode 的一个元素实际上是一个事件数组。这个数组有不同的长度。传入的 json 流如下所示:
{
"eventType": "Impression",
"deviceId": "359849094258487",
"payload": {
"vertical_name": "",
"promo_layout_type": "aa",
"Customer_Id": "1011851",
"ecommerce": {
"promoView": {
"promotions": [{
"name": "/-category_icons_all",
"id": "300275",
"position": "slot_5_1",
"creative": "Central/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
}, {
"name": "/-category_icons_all",
"id": "300276",
"position": "slot_6_1",
"creative": "Lifestyle/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
}, {
"name": "/-category_icons_all",
"id": "413002",
"position": "slot_7_1",
"creative": "Uber/Deals/00000001B890D1739913DDA956AB5C79775991EC"
}]
}
}
}
}
我希望能够分解数组,以便内部的promotions
每个元素都成为可以写入接收器 kafka 主题的单独消息。Flink 是否在 DataStream 和/或 Table API 中提供了爆炸功能?
我试图在这个流上做一个 RichFlatMap 以便能够收集单独的行,但这也只是返回一个 DataStream[Seq[GenericRecord]],如下所示:
class PromoMapper(schema: Schema) extends RichFlatMapFunction[node.ObjectNode,Seq[GenericRecord]] {
override def flatMap(value: ObjectNode, out: Collector[Seq[GenericRecord]]): Unit = {
val promos = value.get("payload").get("ecommerce").get("promoView").get("promotions").asInstanceOf[Seq[node.ObjectNode]]
val record = for{promo <- promos} yield {
val processedRecord: GenericData.Record = new GenericData.Record(schema)
promo.fieldNames().asScala.foreach(f => processedRecord.put(f,promo.get(f)))
processedRecord
}
out.collect(record)
}
}
请帮忙。
解决方案
使用平面地图是正确的想法(不知道您为什么要使用 RichFlatMap,但这是一个细节)。
似乎您应该out.collect(processedRecord)
在 for 循环中调用每个元素,而不是在该循环产生的 Seq 上调用一次。
推荐阅读
- asp.net - 如何建立正确的查询通知?
- antlr - ANTLR4,用词法模式匹配较短的字符序列
- java - 如何在 stringPattern 值birdantantcatbirdcat 中获取字符串结果
- c++ - 初始化模板结构数组
- tensorflow - Tensorfow-lite PReLU Fusion 和 TransposeConv Bias
- excel - 如何从另一个工作簿复制一系列单元格并将其粘贴到另一个工作簿?
- javascript - 显示军用时间的完整日历
- android - 如何在颤动中发出两个依赖的异步请求?
- c# - 导出到excel时Datagridview C#列日期错误格式
- c - 将字符串中的转义十六进制转换为c中的int