首页 > 解决方案 > 具有延迟反应的实时流式传输 cep 系统

问题描述

我需要有关架构问题的帮助。

我用java开发了一个基于kafka技术的cep系统。

CEP 应具有以下特点:

CEP 应采取以下行动:

X期间如果出现cancel-event,所以我从store中删除了一个反应记录。

我使用 KafkaStreams 库创建了该系统。但结果是架构不太好。

KafkaStreams 在后端使用 RocksDB 来存储状态。以集群模式管理商店并拥有一致的数据存在许多问题。此外,我不能对他们进行 sql 查询,以从迭代存储中的每条记录来检查是否有反应时间。

我不是建筑师,我只是一个忙于这项任务的人。有人建议我查看 KafkaStreams 和 Flink 来创建 cep 程序。但实际上这些技术真的适合吗?

ETL 部分毫无疑问。但是我如何构建分析部分和(更有趣的)查询部分?我可以使用哪些工具?

我很感激任何帮助和建议

[更新]

关于查询和存储:

  1. 我们需要检查发送通信的时间是否到了。如果这是真的,那么与一个人交流:推送消息、电子邮件或任何其他渠道。

选择 ...其中 event_time + wait_time < 现在

  1. 之后,我们需要将存储中的记录更新到该场景的下一条消息(并执行此算法,直到该人转到该场景的最后一条消息或执行取消操作)

场景A的顺序:

ev A -> ev B -> ev C -> ev D -----> 开始场景 -----> ev E 或 msg c 已发送 -----> 取消场景

场景 A 的消息:

更新 ... 其中 user_id = xxx 和scenario_id = A

  1. 如果在第 2 点采取了行动,那么我们还需要更新 userStore(有一些关于用户的信息,包括特殊计数器;它们有助于不向客户端发送垃圾邮件,并且不会在晚上向他发送消息)

更新 ... 其中 user_id = xxx

我为 CEP 编写了一个带有一些规则的引擎,我将这些规则保存在特殊存储 - 场景存储中。

因此,有几家商店:

  1. initialStore(使用消息参数在场景序列中保留最后一个事件,等待发送时间)-ev D
  2. 场景存储(场景的事件序列) - CEP 规则
  3. messageStore(消息的文本和其他属性) - msg 规则
  4. userStore(关于用户的信息)

标签: apache-kafkaarchitectureapache-flinkapache-kafka-streamscomplex-event-processing

解决方案


您绝对可以使用 Kafka Streams 进行复杂的事件处理 CEP。甚至还有用于该kafkastreams-cep的开源库。

Kafka Streams 框架支持交互式查询,您可以在其中查询状态存储以检索所需数据。您可以添加 REST 层以使其可从 REST API 查询。请参阅代码示例WordCountInteractiveQueriesExample


推荐阅读