rabbitmq - 每隔一段时间从rabbitMQ队列中读取消息不起作用
问题描述
我想要实现的是每 15 分钟从 RabbitMQ 队列中读取消息。从文档中,我可以看到我可以使用“receiveTimeout”方法来设置间隔。
轮询消费者
AmqpTemplate 本身可用于轮询消息接收。默认情况下,如果没有消息可用,则立即返回 null。没有阻塞。从 1.5 版开始,您可以设置一个 receiveTimeout,以毫秒为单位,并且接收方法会阻塞很长时间,等待消息。
但是我尝试通过 sprint 集成来实现它,receiveTimeout 没有按我预期的那样工作。
我的测试代码如下。
@Bean
Queue createMessageQueue() {
return new Queue(RetryQueue, false);
}
@Bean
public SimpleMessageListenerContainer QueueMessageListenerContainer(ConnectionFactory connectionFactory) {
final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(
connectionFactory);
messageListenerContainer.setQueueNames(RetryQueue);
messageListenerContainer.setReceiveTimeout(900000);
return messageListenerContainer;
}
@Bean
public AmqpInboundChannelAdapter inboundQueueChannelAdapter(
@Qualifier("QueueMessageListenerContainer") AbstractMessageListenerContainer messageListenerContainer) {
final AmqpInboundChannelAdapter amqpInboundChannelAdapter = new AmqpInboundChannelAdapter(
messageListenerContainer);
amqpInboundChannelAdapter.setOutputChannelName("channelRequestFromQueue");
return amqpInboundChannelAdapter;
}
@ServiceActivator(inputChannel = "channelRequestFromQueue")
public void activatorRequestFromQueue(Message<String> message) {
System.out.println("Message: " + message.getPayload() + ", recieved at: " + LocalDateTime.now());
}
我正在近乎实时地将有效负载记录在控制台中。任何人都可以帮忙吗?消费者一旦开始活跃多久?
更新
IntegrationFlow 我曾经每隔一段时间从队列中检索消息,
@Bean
public IntegrationFlow inboundIntegrationFlowPaymentRetry() {
return IntegrationFlows
.from(Amqp.inboundPolledAdapter(connectionFactory, RetryQueue),
e -> e.poller(Pollers.fixedDelay(20_000).maxMessagesPerPoll(-1)).autoStartup(true))
.handle(message -> {
channelRequestFromQueue()
.send(MessageBuilder.withPayload(message.getPayload()).copyHeaders(message.getHeaders())
.setHeader(IntegrationConstants.QUEUED_MESSAGE, message).build());
}).get();
}
解决方案
该Polling Consumer
文档来自关于`RabbitTemplate的Spring AMQP文档,与侦听器容器或Spring集成无关。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#polling-consumer
Spring 集成的适配器是消息驱动的,只要消息可用,您就会收到消息。
要按需获取消息,您需要在所需的RabbitTemplate
任何时间间隔调用。
推荐阅读
- python - 无法打开 Airflow 的智能传感器功能(自定义传感器)
- django - Django update_or_create 在序列化器中引发 IntegrityError
- normalization - 将数据从 0 标准化到 100 时标准化标准偏差
- google-apps-script - 在 Appsheet 中,我可以将我的 appscript(在我的数据电子表格的“C”列中运行)更改为应用程序公式吗?
- excel - RSLinx - DDE 链接/ Excel VBA
- javascript - HTML 认为 html {SOLVED} 中有一个随机的 '<'
- android - 使用颤振的android gradle文件中的错误
- java - 如何在dropwizard中测试另一个类的静态函数
- python - Python 交易脚本,在采取行动之前接收所有指标
- python - 用其他文本替换列中的字符串