apache-flink - FLINK,基于JSON动态输入数据(如地图对象数据)触发事件
问题描述
我想知道FLINK是否可以支持我的要求,我已经浏览了很多文章但不确定我的情况是否可以解决
案例:我有两个输入源。a)Event b)ControlSet 事件样本数据为:
event 1-
{
"id" :100
"data" : {
"name" : "abc"
}
}
event 2-
{
"id" :500
"data" : {
"date" : "2020-07-10";
"name" : "event2"
}
}
如果您看到 event-1 和 event-2 在“数据”中都有不同的属性。所以考虑像数据是自由格式字段并且属性的名称可以相同/不同。
ControlSet 会给我们指令来执行触发器。例如触发条件可能是
(id = 100 && name = abc) OR (id =500 && date ="2020-07-10")
如果这种场景可以在 flink 中运行,请帮助我,什么是最好的方法。我不认为 patternCEP 或 SQL 可以在这里提供帮助,并且不确定事件 dataStream 是否可以作为 JSON 对象并且可以像 JSON 路径一样查询。
解决方案
是的,这可以通过 Flink 完成。CEP 和 SQL 也无济于事,因为它们要求在编译时知道该模式。
对于事件流,我建议通过 id 对这个流进行 key,并将属性/值数据存储在 keyedMapState
中,这是一种 Flink 知道如何根据需要管理、检查点、恢复和重新缩放的 keyed state。这为我们提供了一个分布式映射,将 id 映射到保存每个 id 数据的哈希映射。
对于控制流,让我首先描述一个简化版本的解决方案,其中控制查询的形式为
(id == key) && (attr == value)
我们可以简单地通过查询中的 id(即key)来键入这个流,并将这个流连接到事件流。我们将使用 aRichCoProcessFunction
来保存上面描述的 MapState,当这些查询到达时,我们可以查看key有哪些数据,并检查 if map[attr] == value
。
处理更复杂的查询,例如问题中的查询
(id1 == key1 && attr1 == value1) OR (id2 == key2 && attr2 == value2)
我们可以做一些更复杂的事情。
在这里,我们需要为每个控件查询分配一个唯一的 ID。
一种方法是将这些查询广播到KeyedBroadcastProcessFunction
再次持有上述 MapState 的地方。在该processBroadcastElement
方法中,每个实例都可以applyToKeyedState
用来检查该实例存储键控状态的查询组件的有效性(从偶数流中的数据字段派生的属性/值对)。对于实例可以提供请求信息的查询的每个键控组件,它会在下游发出一个结果。
然后,KeyedBroadcastProcessFunction
我们通过控制查询 id 对流进行键控,并使用 aKeyedProcessFunction
将来自各个实例的所有响应组合在一起KeyedBroadcastProcessFunction
,并确定控制/查询消息的最终结果。
这里实际上没有必要使用广播,但我发现这个方案解释起来更简单一些。但是您可以改为将查询的键控副本仅路由到RichCoProcessFunction
控制查询中使用的键的持有 MapState 的实例,然后对最终结果进行相同类型的组装。
这可能很难遵循。我提出的建议涉及编写我之前在示例中编写的两种技术:https ://github.com/alpinegizmo/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/ Solutions/datastream_java/broadcast/TaxiQuerySolution.java是一个使用广播触发跨键控状态的查询谓词评估的示例, https ://gist.github.com/alpinegizmo/5d5f24397a6db7d8fabc1b12a15eeca6 是一个使用唯一 id 来重新- 在并行进行多个浓缩后组装单个响应。
推荐阅读
- google-classroom - 如何模拟复制菜单功能来重现下学期的教室?
- node.js - CSV_NON_TRIMABLE_CHAR_AFTER_CLOSING_QUOTE CSV 解析节点?
- validation - Blazor 覆盖 InputText 类(删除“修改后的有效”)
- amazon-web-services - LoadBalancer 和 EC2 实例之间的 AWS 加密连接
- python - 如何嵌套一个 for 循环来绘制虹膜数据上的散点图?
- javascript - 不知道我的 IF 语句有什么问题(JAVASCRIPT)
- reactjs - React - 通过 useContext 在组件中使用状态
- firebase - 从 Flutter Web 应用程序发送 Firebase 存储授权作为 url 参数
- c# - 即使列表为空,C# 列表也始终显示 1 个元素
- gherkin - 将自定义(结构化)元数据添加到场景