apache-flink - Flink CEP 事件加入到一个反向数据流
问题描述
我有 2 个数据流(例如)
ts | device | custId | temp
1 | 'device1'| 1 | 10
1 | 'device2'| 4 | 7
2 | 'device1'| 1 | 10
3 | 'device1'| 1 | 10
4 | 'device1'| 1 | 10
5 | 'device2'| 4 | 10
我创建了一个 CEP 模式,我想在 4 秒内检查温度是否大于 30。
val pattern = Pattern.begin[Device]("start")
.where(_.sumtemp >= 30)
.within(Time.seconds(4))
有没有办法将此模式流的输出连接到另一个传入数据流以低于?
ts | custId | morethanthiry
1 | 1 | yes
2 | 4 | no
如果可以分享一个例子来做到这一点,我将非常感激。
解决方案
有不止一种选择。你可以加入你的流coGroup
例子:
set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());
您可以将其视为 SQL 中的联接。
一个实现的小例子:
class MyCoGroupFunction extends RichCoGroupFunction[DataTypeOfStream1, DataTypeOfStream2, DataTypeOfOutput] {
override def coGroup(first: DataTypeOfStream1,
second: DataTypeOfStream2],
out: DataTypeOfOutput): Unit = {
out.collect(...)
//your output
}
}
如果需要,您还可以使用状态。
还有其他选项可以加入两个流,例如
- union(如果要连接的流具有相同的数据类型)
- 连接
- coFlatMap 方法之间的差异很小。
有关更多信息,请参阅https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/ 。
推荐阅读
- cassandra - cassandra如何在sstable中组织价值
- javascript - 通过 getElementById().innerHTML 编辑 div 没有任何变化
- enterprise-architect - 如何在 Enterprise Architect 14 中更改现有表的所有者和表空间?
- sql-server - sql脚本仅获取与给定条件匹配的几行
- charles-proxy - 使用正确的正则表达式时,正则表达式过滤器不会过滤掉 Charles Proxy 上的网络调用/流量
- ios - 无法使用警报操作定向到另一个视图控制器
- bash - 检查每个脚本的状态后,按顺序执行多个 shell 脚本
- php - 如何使用php按目录中的日期文件排序
- c# - 显式绑定重定向与自动生成的绑定重定向冲突
- css - 顺时针旋转 180 然后 360