spring-integration - 使用 Spring 从 pubsub 消费一批消息
问题描述
如何使用来自 pubsub 的多条消息?这似乎是一个简单的问题,应该有简单的解决方案,但目前我可以找到简单的方法来使用来自 pubsub 的批量记录spring-cloud-gcp-pubsub
。
我正在使用spring-cloud-gcp-pubsub
来自 pubsub 的消息并在 Spring Boot 应用程序中处理它们。我当前的设置非常简单PubSubInboundChannelAdapter
,并且ServiceActivator
会消耗记录。经过研究,我发现了 spring 集成Aggregators
,但它们似乎不是这样做的好方法,因为向下游传播确认并不容易。有什么我想念的吗?如何消费批量消息?
解决方案
这PubSubInboundChannelAdapter
是基于对主题的订阅。因此,它将是一个消息流,这PubSubInboundChannelAdapter
会对它们中的每一个转换为 Spring Message 并将其发送到下游发送到配置的通道做出反应。订阅期间确实没有办法获取一批消息。
您还需要记住,offset
在 GCP Pub/Sub 中没有类似的东西。您确实应该确认您从 Pub/Sub 消费的每条消息。
虽然有办法一次提取一批消息,但使用PubSubMessageSource
. 可以解决问题,messageSource.setMaxFetchSize(5);
但这PubSubMessageSource
仍然会单独生成每条消息,因此您可以(n)独立地确认它们。
当然,您可以利用该功能PubSubMessageSource
使用 - PubSubSubscriberOperations.pullAndConvert()
。有关更多信息,请参阅它的 JavaDocs:
/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
* the desired payload type.
* @param subscription the subscription name
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @param payloadType the type to which the payload of the Pub/Sub messages should be converted
* @param <T> the type of the payload
* @return the list of received acknowledgeable messages
* @since 1.1
*/
<T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
Boolean returnImmediately, Class<T> payloadType);
所以,这个看起来像你正在寻找的东西,因为你确实会有一个消息列表,每个消息都是一个带有 (n)ack 回调的包装器。
此 API 可用于自定义@InboundChannelAdapter
MessageSource
或Supplier
@Bean
实现。
但仍然:我看不到整个批处理的好处,因为每条消息都可以单独确认,而不会影响所有其他消息。
推荐阅读
- arrays - 如何在 Kotlin 中将对象转换为字节数组
- jq - JQ 仅从 AWS CLI 返回一个 CIDR 块
- java - NotifyDatasetchanged not working. AppCrashes after ButtonOnClick
- aws-lambda - AWS CDK 跨账户 Lambda 部署权限问题
- c# - 如何通过asp.net核心中的http请求调用http azure函数?
- reactjs - 如何通过 FormData 将图像上传到 React Native 中的 API
- mysql - Operand Should Contain 1 Column(s), Trying to generate volunteer data
- keras - 如何预处理数据以训练字符级 RNN
- sql-server - 带表更新的 While 循环
- java - 从较大的文本中获取 Json/Xml