apache-flink - flink 1.10 DataStream API 中的 JSON 数据聚合
问题描述
我正在尝试使用 Kafka 消息(作为 Flink 1.10 API StreamSource)在 Elasticsearch 中聚合数据。数据以动态的 JSON 格式接收,示例如下。我想通过唯一 ID 将多个记录组合在单个文档中。数据按顺序排列,它是时间序列数据。
源接收器 kafka 和目标接收器 elasticseach 7.6.1 6
我没有找到任何可以在下面的问题陈述中使用的好例子。
Record : 1
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.325",
"Data" :
{
"Field1" : "ABC",
"Field2" : "DEF"
}
}
Record : 2
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
"Field3" : "GHY"
}
}
Result :
{
"ID" : "1",
"Start_timestamp" : "2020-05-07 14:34:51.325",
"End_timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
"Field1" : "ABC",
"Field2" : "DEF",
"Field3" : "GHY"
}
}
以下是版本详情:
- Flink 1.10
- Flink-kafka-connector 2.11
- Flink-Elasticsearch-connector 7.x
- 卡夫卡 2.11
- JDK 1.8
解决方案
您所要求的可以描述为某种连接,并且您可以通过多种方式使用 Flink 完成此操作。Apache Flink 培训中有一个状态丰富的示例,它展示了如何使用应该帮助您入门的类似连接。您需要先通读相关的培训材料——至少是关于数据管道和 ETL的部分。RichFlatMapFunction
您最终将使用这种方法对流进行分区(通过keyBy
),然后使用键分区状态(可能MapState
在这种情况下,假设您为每个 ID 存储多个属性/值对)来存储来自记录 1 等记录的信息,直到您准备好发出结果。
顺便说一句,如果键集是无限的,您需要注意不要永远保持这种状态。要么在不再需要状态时清除状态(如本例所示),要么使用状态 TTL安排其最终删除。
有关 Flink 中其他类型连接的更多信息,请参阅此答案中的链接。
推荐阅读
- javascript - JavaScript 函数在设定的时间获取昨天的日期为 00:00:00
- javascript - 关于 React-native 应用的站点展示
- java - 模不返回正确的值
- android - Recyclerview - 单击搜索显示的结果会打开不正确的项目
- java - 在使用 httpsurlconnection 发送到服务器之前添加到 jsonobject 时如何编码 jsonArray
- postgresql - PostgreSQL AWS RDS 中未释放/关闭空闲(事务中非空闲)连接
- css - 在本机基础卡中启用溢出
- c# - 使用标签移动游戏对象不起作用 Unity
- amazon-web-services - AWS元素媒体转换自动旋转视频?
- python - Python Django 如何在 django models.py 中创建哈希字段