首页 > 解决方案 > 在 Spring 集成中为 Redis 创建 MessageSource

问题描述

我想配置 InboundChannelAdapter 以便它应该从 redis 队列中弹出消息并将其传递给基于 Java 的注释中的 ServiceActivator(仅,更喜欢避免使用 XML)。我从 Spring 文档中找到了代码:

@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
    return () -> {
        ...
    };
}

但是我在这里不明白的是,如何通过使用redisConnectionFactory从redis队列中弹出数据来返回MessageSource?

换句话说,如何在基于 java 的注释中做到这一点?

  <int-redis:queue-inbound-channel-adapter id="postPublicationInboundAdapter"
                                             connection-factory="redisConnectionFactory"
                                             channel="postPublicationChannel"
                                             error-channel="postPublicationLoggingChannel"
                                             receive-timeout="5000"
                                             queue="archive.post.publication.queue"
                                             serializer="postPublicationJsonRedisSerializer"/>

标签: javaredisspring-integration

解决方案


让我们从这里开始:https ://docs.spring.io/spring-integration/docs/5.0.9.RELEASE/reference/html/overview.html#programming-tips

通过 XML 配置和 Spring 集成命名空间支持,XML 解析器隐藏了目标 bean 的声明和连接方式。对于 Java 和注释配置,了解目标最终用户应用程序的框架 API 很重要。

然后我们为此打开一个 XSD <int-redis:queue-inbound-channel-adapter>

 <xsd:element name="queue-inbound-channel-adapter">
    <xsd:annotation>
        <xsd:documentation>
            Defines a Message Producing Endpoint for the
            'org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint' for listening a Redis
            queue.
        </xsd:documentation>
    </xsd:annotation>

所以,听起来 aint-redis:queue-inbound-channel-adapter不是 a MessageSource。因此@InboundChannelAdapter是死胡同。我同意 XML 元素的名称是错误的,但是现在重命名它已经太晚了。

从这里我们也发现我们需要处理RedisQueueMessageDrivenEndpoint. 由于它是消息驱动的、自我管理的,因此我们不需要任何特殊的注释。将其声明为这样的 bean 就足够了:

@Bean
RedisQueueMessageDrivenEndpoint redisQueueMessageDrivenEndpoint(RedisConnectionFactory redisConnectionFactory, RedisSerializer<?> serializer) {
    RedisQueueMessageDrivenEndpoint endpoint =
                new RedisQueueMessageDrivenEndpoint("archive.post.publication.queue", redisConnectionFactory);
    endpoint.setOutputChannelName("postPublicationChannel");
    endpoint.setErrorChannelName("postPublicationLoggingChannel");
    endpoint.setReceiveTimeout(5000);
    endpoint.setSerializer(serializer);
    return endpoint;
}

推荐阅读