apache-kafka - 具有延迟反应的实时流式传输 cep 系统
问题描述
我需要有关架构问题的帮助。
我用java开发了一个基于kafka技术的cep系统。
CEP 应具有以下特点:
- 分布式(集群)
- 可扩展的
- 容错
CEP 应采取以下行动:
- 从不同的来源创建事件,这实际上是多分区的 kafka-topics (ETL-part)
- 分析该事件的序列,如果它们与特殊模式(场景)匹配,则将反应记录放入某个商店(分析部分)
- 每 X 周期查询这家商店以与客户进行一些沟通(如果时间到了)(计划部分)
X期间如果出现cancel-event,所以我从store中删除了一个反应记录。
我使用 KafkaStreams 库创建了该系统。但结果是架构不太好。
KafkaStreams 在后端使用 RocksDB 来存储状态。以集群模式管理商店并拥有一致的数据存在许多问题。此外,我不能对他们进行 sql 查询,以从迭代存储中的每条记录来检查是否有反应时间。
我不是建筑师,我只是一个忙于这项任务的人。有人建议我查看 KafkaStreams 和 Flink 来创建 cep 程序。但实际上这些技术真的适合吗?
ETL 部分毫无疑问。但是我如何构建分析部分和(更有趣的)查询部分?我可以使用哪些工具?
我很感激任何帮助和建议
[更新]
关于查询和存储:
- 我们需要检查发送通信的时间是否到了。如果这是真的,那么与一个人交流:推送消息、电子邮件或任何其他渠道。
选择 ...其中 event_time + wait_time < 现在
- 之后,我们需要将存储中的记录更新到该场景的下一条消息(并执行此算法,直到该人转到该场景的最后一条消息或执行取消操作)
场景A的顺序:
ev A -> ev B -> ev C -> ev D -----> 开始场景 -----> ev E 或 msg c 已发送 -----> 取消场景
场景 A 的消息:
- msg a (wait_time: 10 分钟后发送)
- msg b(在 wait_time 后发送:1 天)
- msg c(在 wait_time 后发送:7 天)- 最后
更新 ... 其中 user_id = xxx 和scenario_id = A
- 如果在第 2 点采取了行动,那么我们还需要更新 userStore(有一些关于用户的信息,包括特殊计数器;它们有助于不向客户端发送垃圾邮件,并且不会在晚上向他发送消息)
更新 ... 其中 user_id = xxx
我为 CEP 编写了一个带有一些规则的引擎,我将这些规则保存在特殊存储 - 场景存储中。
因此,有几家商店:
- initialStore(使用消息参数在场景序列中保留最后一个事件,等待发送时间)-ev D
- 场景存储(场景的事件序列) - CEP 规则
- messageStore(消息的文本和其他属性) - msg 规则
- userStore(关于用户的信息)
解决方案
您绝对可以使用 Kafka Streams 进行复杂的事件处理 CEP。甚至还有用于该kafkastreams-cep的开源库。
Kafka Streams 框架支持交互式查询,您可以在其中查询状态存储以检索所需数据。您可以添加 REST 层以使其可从 REST API 查询。请参阅代码示例WordCountInteractiveQueriesExample。
推荐阅读
- r - 使用 JSON 更新 HTML UI
- python - Keras 级联层的性能比单个 CNN 差
- mysql - MySQL:子查询(基本)
- html - 页眉和页脚之间的内容 div 100% 高度
- python - 在单个 QuerySet 中获取多个模型,按时间戳排序
- r - 正则表达式 - 第 n 个 '\n' 之后的匹配文本
- spring-boot - 资源位置不能为空
- javascript - 添加和删除行的Java Script问题
- laravel - 如何将guzzle json帖子正文包装在数组中
- google-sheets - 有没有办法在 Google 表格中使用命名范围进行 BigQuery 查询?