首页 > 解决方案 > 多种事件类型的 Apache Flink CEP 模式

问题描述

目前我正在做一个学期项目,我必须识别三个事件的系列。像P -> R -> P

我们有两种不同的事件类型,它们通过同一主题中的 Kafka 连接器使用。

我创建了一个名为 Event 的父类,其他两种类型都从该类派生。

Kafka 连接器将带有 EventSchema 的 JSON 反序列化到父类 Event。

val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)

该模式如下所示:

val pattern = Pattern
  .begin[Event]("before")
  .subtype(classOf[Position])
  .next("recognized")
  .subtype(classOf[Recognized])
  .next("after")
  .subtype(classOf[Position])

当前的问题是,如果我以适当的格式发送三条消息,则无法识别该模式。

我尝试过的其他方法..我改变了这样的模式:

val pattern = Pattern
  .begin[Event]("before")
  .where(e => e.getType == "position")
  .next("recognized")
  .where(e => e.getType == "recognition")
  .next("after")
  .where(e => e.getType == "position")

这种模式有效,但后来我无法将 Event 类转换为位置或识别..

我在这里想念什么?

标签: apache-kafkaapache-flinkflink-cepcomplex-event-processing

解决方案


根据评论,我认为您应该返回子类型实例而不是事件。这是我为您提供的示例代码:

val event = mapper.readValue(bytes, classOf[Event])
event.getType match {
  case "position" => mapper.readValue(bytes, classOf[Position])
  case "recognition" => mapper.readValue(bytes, classOf[Recognized])
  case _ =>
}

我成功地从 CEPITCase.java 中的一个测试用例中尝试了这个例子。

DataStream<Event> input = env.fromElements(
  new Event(1, "foo", 4.0),
  new SubEvent(2, "foo", 4.0, 1.0),
  new SubEvent(3, "foo", 4.0, 1.0),
  new SubEvent(4, "foo", 4.0, 1.0),
  new Event(5, "middle", 5.0)
);

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
.followedByAny("middle").subtype(SubEvent.class)
.followedByAny("end").subtype(SubEvent.class);

推荐阅读