apache-kafka - 多种事件类型的 Apache Flink CEP 模式
问题描述
目前我正在做一个学期项目,我必须识别三个事件的系列。像P -> R -> P
我们有两种不同的事件类型,它们通过同一主题中的 Kafka 连接器使用。
我创建了一个名为 Event 的父类,其他两种类型都从该类派生。
Kafka 连接器将带有 EventSchema 的 JSON 反序列化到父类 Event。
val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)
该模式如下所示:
val pattern = Pattern
.begin[Event]("before")
.subtype(classOf[Position])
.next("recognized")
.subtype(classOf[Recognized])
.next("after")
.subtype(classOf[Position])
当前的问题是,如果我以适当的格式发送三条消息,则无法识别该模式。
我尝试过的其他方法..我改变了这样的模式:
val pattern = Pattern
.begin[Event]("before")
.where(e => e.getType == "position")
.next("recognized")
.where(e => e.getType == "recognition")
.next("after")
.where(e => e.getType == "position")
这种模式有效,但后来我无法将 Event 类转换为位置或识别..
我在这里想念什么?
解决方案
根据评论,我认为您应该返回子类型实例而不是事件。这是我为您提供的示例代码:
val event = mapper.readValue(bytes, classOf[Event])
event.getType match {
case "position" => mapper.readValue(bytes, classOf[Position])
case "recognition" => mapper.readValue(bytes, classOf[Recognized])
case _ =>
}
我成功地从 CEPITCase.java 中的一个测试用例中尝试了这个例子。
DataStream<Event> input = env.fromElements(
new Event(1, "foo", 4.0),
new SubEvent(2, "foo", 4.0, 1.0),
new SubEvent(3, "foo", 4.0, 1.0),
new SubEvent(4, "foo", 4.0, 1.0),
new Event(5, "middle", 5.0)
);
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
.followedByAny("middle").subtype(SubEvent.class)
.followedByAny("end").subtype(SubEvent.class);
推荐阅读
- parameter-passing - Azure Table storage to Azure Table storage using Azure Data Factory using DateTime Parameterized query
- java - 在 JavaEE Maven 项目中使用 Hibernate 时出现“java.lang.NoClassDefFoundError: javax/persistence/criteria/Selection”错误
- c# - 你如何检测 On MessageActivityAsync 中的活动对话框
- javascript - mongodb changestream“管道”不起作用
- javascript - Codewars 挑战 - Hamelin 的聋鼠 - JavaScript - 如何通过特定字符集拆分字符串?
- graphql - 意外
: 一个graphql文件可以只导入其他文件而不做其他事情吗? - c - 将 BIGNUM 转换为原始二进制文件
- bash - 为什么在本地通过 SSH 初始化远程 bash 脚本时无法访问 GitHub?
- c++ - 在数组中显示第一个元素用户输入
- compiler-construction - 如何在coq中分析两个看似等效但行为不同的程序?