首页 > 解决方案 > Flink CEP 事件加入到一个反向数据流

问题描述

我有 2 个数据流(例如)

ts | device | custId | temp
1 | 'device1'| 1 | 10
1 | 'device2'| 4 | 7
2 | 'device1'| 1 | 10
3 | 'device1'| 1 | 10
4 | 'device1'| 1 | 10
5 | 'device2'| 4 | 10

我创建了一个 CEP 模式,我想在 4 秒内检查温度是否大于 30。

val pattern = Pattern.begin[Device]("start")
      .where(_.sumtemp >= 30)
      .within(Time.seconds(4))

有没有办法将此模式流的输出连接到另一个传入数据流以低于?

ts | custId | morethanthiry
1 | 1 | yes
2 | 4 | no

如果可以分享一个例子来做到这一点,我将非常感激。

标签: apache-flinkflink-streaming

解决方案


有不止一种选择。你可以加入你的流coGroup

例子:

set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new MyCoGroupFunction());

您可以将其视为 SQL 中的联接。

一个实现的小例子:

class MyCoGroupFunction extends RichCoGroupFunction[DataTypeOfStream1, DataTypeOfStream2, DataTypeOfOutput] {

      override def coGroup(first: DataTypeOfStream1,
                         second: DataTypeOfStream2],
                         out: DataTypeOfOutput): Unit = {

           out.collect(...)
           //your output

      }
}

如果需要,您还可以使用状态。

还有其他选项可以加入两个流,例如

  • union(如果要连接的流具有相同的数据类型)
  • 连接
  • coFlatMap 方法之间的差异很小。

有关更多信息,请参阅https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/


推荐阅读