spring-integration - Spring Cloud Stream @StreamListener 和 Spring Integration 的 Resequencer 模式
问题描述
AFAIK Spring Cloud Stream 项目基于 Spring Integration。因此,我想知道是否有一种很好的方法可以在StreamListener
触发处理程序之前重新排序入站消息的子集?或者我是否需要IntegrationFlow
使用 Spring Integration 中的 XML 或 Java DSL 配置从头开始组装整个系统?
我的用例如下。大多数时候,我会在 Kafka 主题上处理入站消息。但是,一些事件必须根据CORRELATION_ID
、SEQUENCE_NUMBER
和SEQUENCE_SIZE
标头重新排序。换句话说,我想尽可能多地使用 StreamListener 并简单地为某些事件插入重新排序策略。
解决方案
是的,您需要为此使用 Spring Integration。实际上 Spring Cloud Stream 只是一个有效的绑定框架。它通过绑定器将消息处理程序绑定到消息代理。消息处理程序本身由用户提供。@StreamListener 注释几乎等同于 Spring Integration 的 @ServiceActivator ,具有一些额外的功能(例如,条件路由),但除此之外它只是一个消息处理程序。
现在,正如您所逃避的那样,您知道您可以使用 Spring Integration (SI) 来实现消息处理程序或内部 SI 流,这是正常的,建议用于复杂情况。
也就是说,我们确实提供了实现某些 EIP 组件的开箱即用的应用程序,并且我们确实有,例如,聚合器应用程序,您可以将其用作实现重新排序器的起点。此外,鉴于我们有一个聚合器应用程序而不是resequencer,如果您有兴趣,我们很乐意接受它的贡献。我希望这能回答你的问题。
推荐阅读
- r - R使用dplyr计算组内不同值的数量
- python - 100x100 python 背面图像的分解和收集
- amazon-web-services - 别名记录在 Route53 中不起作用
- node.js - 为什么我在 Firebase 中收到此错误“函数返回未定义、预期的承诺或值”
- c# - Better way to get Interval bounds when only time is given
- python - Mat不是数字元组openCV 2
- javascript - Google Analytics(分析)用户 ID 跟踪无法正常工作
- sql-server - MS Access / T-SQL - 分组间隔数据
- android - 如何将 json 文件从 Android 手机上传到 firebase 数据库?
- c# - 在等待的任务完成之前返回的函数