首页 > 解决方案 > 使用 Spring 从 pubsub 消费一批消息

问题描述

如何使用来自 pubsub 的多条消息?这似乎是一个简单的问题,应该有简单的解决方案,但目前我可以找到简单的方法来使用来自 pubsub 的批量记录spring-cloud-gcp-pubsub

我正在使用spring-cloud-gcp-pubsub来自 pubsub 的消息并在 Spring Boot 应用程序中处理它们。我当前的设置非常简单PubSubInboundChannelAdapter,并且ServiceActivator会消耗记录。经过研究,我发现了 spring 集成Aggregators,但它们似乎不是这样做的好方法,因为向下游传播确认并不容易。有什么我想念的吗?如何消费批量消息?

标签: spring-integrationspring-cloudgoogle-cloud-pubsub

解决方案


PubSubInboundChannelAdapter是基于对主题的订阅。因此,它将是一个消息流,这PubSubInboundChannelAdapter会对它们中的每一个转换为 Spring Message 并将其发送到下游发送到配置的通道做出反应。订阅期间确实没有办法获取一批消息。

您还需要记住,offset在 GCP Pub/Sub 中没有类似的东西。您确实应该确认您从 Pub/Sub 消费的每条消息。

虽然有办法一次提取一批消息,但使用PubSubMessageSource. 可以解决问题,messageSource.setMaxFetchSize(5);但这PubSubMessageSource仍然会单独生成每条消息,因此您可以(n)独立地确认它们。

当然,您可以利用该功能PubSubMessageSource使用 - PubSubSubscriberOperations.pullAndConvert()。有关更多信息,请参阅它的 JavaDocs:

/**
 * Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
 * the desired payload type.
 * @param subscription the subscription name
 * @param maxMessages the maximum number of pulled messages
 * @param returnImmediately returns immediately even if subscription doesn't contain enough
 * messages to satisfy {@code maxMessages}
 * @param payloadType the type to which the payload of the Pub/Sub messages should be converted
 * @param <T> the type of the payload
 * @return the list of received acknowledgeable messages
 * @since 1.1
 */
<T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
        Boolean returnImmediately, Class<T> payloadType);

所以,这个看起来像你正在寻找的东西,因为你确实会有一个消息列表,每个消息都是一个带有 (n)ack 回调的包装器。

此 API 可用于自定义@InboundChannelAdapter MessageSourceSupplier @Bean实现。

但仍然:我看不到整个批处理的好处,因为每条消息都可以单独确认,而不会影响所有其他消息。


推荐阅读