首页 > 解决方案 > flink 1.10 DataStream API 中的 JSON 数据聚合

问题描述

我正在尝试使用 Kafka 消息(作为 Flink 1.10 API StreamSource)在 Elasticsearch 中聚合数据。数据以动态的 JSON 格式接收,示例如下。我想通过唯一 ID 将多个记录组合在单个文档中。数据按顺序排列,它是时间序列数据。

源接收器 kafka 和目标接收器 elasticseach 7.6.1 6

我没有找到任何可以在下面的问题陈述中使用的好例子。

Record : 1
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.325",
"Data" : 
{
 "Field1" : "ABC",
 "Field2" : "DEF"
}
}

Record : 2
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.725",
"Data" : 
{
 "Field3" : "GHY"
}
}

Result :

{
"ID" : "1",
"Start_timestamp" : "2020-05-07 14:34:51.325",
"End_timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
 "Field1" : "ABC",
 "Field2" : "DEF",
 "Field3" : "GHY"
}
}

以下是版本详情:

  1. Flink 1.10
  2. Flink-kafka-connector 2.11
  3. Flink-Elasticsearch-connector 7.x
  4. 卡夫卡 2.11
  5. JDK 1.8

标签: apache-flinkflink-streaming

解决方案


您所要求的可以描述为某种连接,并且您可以通过多种方式使用 Flink 完成此操作。Apache Flink 培训中有一个状态丰富的示例,它展示了如何使用应该帮助您入门的类似连接。您需要先通读相关的培训材料——至少是关于数据管道和 ETL的部分。RichFlatMapFunction

您最终将使用这种方法对流进行分区(通过keyBy),然后使用键分区状态(可能MapState在这种情况下,假设您为每个 ID 存储多个属性/值对)来存储来自记录 1 等记录的信息,直到您准备好发出结果。

顺便说一句,如果键集是无限的,您需要注意不要永远保持这种状态。要么在不再需要状态时清除状态(如本例所示),要么使用状态 TTL安排其最终删除。

有关 Flink 中其他类型连接的更多信息,请参阅此答案中的链接。


推荐阅读