首页 > 解决方案 > Spring Cloud Stream @StreamListener 和 Spring Integration 的 Resequencer 模式

问题描述

AFAIK Spring Cloud Stream 项目基于 Spring Integration。因此,我想知道是否有一种很好的方法可以在StreamListener触发处理程序之前重新排序入站消息的子集?或者我是否需要IntegrationFlow使用 Spring Integration 中的 XML 或 Java DSL 配置从头开始组装整个系统?

我的用例如下。大多数时候,我会在 Kafka 主题上处理入站消息。但是,一些事件必须根据CORRELATION_IDSEQUENCE_NUMBERSEQUENCE_SIZE标头重新排序。换句话说,我想尽可能多地使用 StreamListener 并简单地为某些事件插入重新排序策略。

标签: spring-integrationspring-cloud-streamenterprise-integration

解决方案


是的,您需要为此使用 Spring Integration。实际上 Spring Cloud Stream 只是一个有效的绑定框架。它通过绑定器将消息处理程序绑定到消息代理。消息处理程序本身由用户提供。@StreamListener 注释几乎等同于 Spring Integration 的 @ServiceActivator ,具有一些额外的功能(例如,条件路由),但除此之外它只是一个消息处理程序

现在,正如您所逃避的那样,您知道您可以使用 Spring Integration (SI) 来实现消息处理程序或内部 SI 流,这是正常的,建议用于复杂情况。

也就是说,我们确实提供了实现某些 EIP 组件的开箱即用的应用程序,并且我们确实有,例如,聚合器应用程序,您可以将其用作实现重新排序器的起点。此外,鉴于我们有一个聚合器应用程序而不是resequencer,如果您有兴趣,我们很乐意接受它的贡献。我希望这能回答你的问题。


推荐阅读