首页 > 解决方案 > Spring集成-发布者确认超时?

问题描述

这是我目前的设置:

queue1 和 queue2 与流向 channel1 的集成流一起标记:

@Bean
public IntegrationFlow q1f() {
    return IntegrationFlows
            .from(queue1InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

@Bean
public IntegrationFlow q2f() {
    return IntegrationFlows
            .from(queue2InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

然后,一切都被聚合,然后在聚合消息被rabbitmq确认后确认:

@Bean
    public IntegrationFlow aggregatingFlow() {
        return IntegrationFlows
                .from(amqpInputChannel())
                .aggregate(...
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(TimeUnit.SECONDS.toMillis(10))
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
                )
                .handle(amqpOutboundEndpoint())
                .get();
    }

    @Bean
    public AmqpOutboundEndpoint amqpOutboundEndpoint() {
        AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
        outboundEndpoint.setConfirmAckChannel(manualAckChannel());
        outboundEndpoint.setConfirmCorrelationExpressionString("#root");
        outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
        outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
        return outboundEndpoint;
    }

ackTemplate()用 cf 设置springFactory.setPublisherConfirms(true);

我看到的问题是,每 10 天一次,有一些消息卡unacknowledged在 rabbitmq 中的状态。

我的猜测是,消息的发布以某种方式等待兔子做,PUBLISHER CONFIRMS但它永远不会得到它并超时?在这种情况下,我从不 ACK 中的消息queue1。这可能吗?

因此,只需再完成一次工作流程:

[两个队列->直接通道->聚合器(保留通道和标记值)->发布到兔子->兔子通过发布者确认返回ACK->spring确认通道上的所有消息+它为聚合消息保存在内存中的值]

我也有我的聚合器实现(因为我需要手动确认来自 q1 和 q2 的消息):

public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
    public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
    private AckingState ackingState;

    public AbstractManualAckAggregatingMessageGroupProcessor(AckingState ackingState){
        this.ackingState = ackingState;
    }

    @Override
    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
        List<ManualAckPair> manualAckPairs = new ArrayList<>();
        group.getMessages().forEach(m -> {
            Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            manualAckPairs.add(new ManualAckPair(channel, deliveryTag, ackingState));
        });
        aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
        return aggregatedHeaders;
    }
}

更新

这是 rabbit admin 的样子(很长一段时间内有 2 条未确认的消息,并且在重新启动之前不会被 ACKED - 当它重新传递时): 在此处输入图像描述

标签: javaspring-integrationspring-amqpspring-rabbit

解决方案


在 Spring AMQP 2.1 版(Spring Integration 5.1)中,我们添加了一个Future<?>并返回消息CorrelationData来协助处理这种事情。如果您使用的是旧版本,则可以子类CorrelationData化(并且您必须在代码中处理设置未来和返回的消息)。

这与计划任务一起,可以检测丢失的确认...

@SpringBootApplication
@EnableScheduling
public class Igh2755Application {

    public static void main(String[] args) {
        SpringApplication.run(Igh2755Application.class, args);
    }

    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            SuccessCallback<? super Confirm> successCallback = confirm -> {
                System.out.println((confirm.isAck() ? "A" : "Na") + "ck received");
            };
            FailureCallback failureCallback = throwable -> {
                System.out.println(throwable.getMessage());
            };

            // Good - ack
            CorrelationData correlationData = new CorrelationData("good");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("", "foo", "data", correlationData);

            // Missing exchange nack, no return
            correlationData = new CorrelationData("missing exchange");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("missing exchange", "foo", "data", correlationData);

            // Missing queue ack, with return
            correlationData = new CorrelationData("missing queue");
            correlationData.getFuture().addCallback(successCallback, failureCallback);
            this.futures.put(correlationData);
            template.convertAndSend("", "missing queue", "data", correlationData);
        };
    }

    @Scheduled(fixedDelay = 5_000)
    public void checkForMissingAcks() {
        System.out.println("Checking pending acks");
        CorrelationData correlationData = this.futures.poll();
        while (correlationData != null) {
            try {
                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                    if (correlationData.getReturnedMessage() == null) {
                        System.out.println("Ack received OK for " + correlationData.getId());
                    }
                    else {
                        System.out.println("Message returned for " + correlationData.getId());
                    }
                }
                else {
                    System.out.println("Nack received for " + correlationData.getId());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Interrupted");
            }
            catch (ExecutionException e) {
                System.out.println("Failed to get an ack " + e.getCause().getMessage());
            }
            catch (TimeoutException e) {
                System.out.println("Timed out waiting for ack for " + correlationData.getId());
            }
            correlationData = this.futures.poll();
        }
        System.out.println("No pending acks, exiting");
    }

}

.

Checking pending acks
Ack received OK for good
Nack received for missing exchange
Message returned for missing queue
No pending acks, exiting

使用 Spring IntegrationconfirmCorrelationExpression可以使用它来创建CorrelationData实例。

编辑

使用 Spring 集成...

@SpringBootApplication
@EnableScheduling
public class Igh2755Application {

    public static void main(String[] args) {
        SpringApplication.run(Igh2755Application.class, args);
    }

    private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();

    public interface Gate {

        void send(@Header("exchange") String exchange, @Header("rk") String rk, String payload);

    }

    @Bean
    @DependsOn("flow")
    public ApplicationRunner runner(Gate gate) {
        return args -> {
            gate.send("", "foo", "good");
            gate.send("junque", "rk", "missing exchange");
            gate.send("", "junque", "missing queue");
        };
    }

    @Bean
    public IntegrationFlow flow(RabbitTemplate template) {
        return IntegrationFlows.from(Gate.class)
                    .handle(Amqp.outboundAdapter(template)
                            .confirmCorrelationExpression("@correlationCreator.create(#root)")
                            .exchangeNameExpression("headers.exchange")
                            .routingKeyExpression("headers.rk")
                            .returnChannel(returns())
                            .confirmAckChannel(acks())
                            .confirmNackChannel(acks()))
                    .get();
    }

    @Bean
    public MessageChannel acks() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel returns() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow ackFlow() {
        return IntegrationFlows.from("acks")
                /*
                 * Work around a bug because the correlation data is wrapped and so the
                 * wrong future is completed.
                 */
                .handle(m -> {
                    System.out.println(m);
                    if (m instanceof ErrorMessage) { // NACK
                        NackedAmqpMessageException nme = (NackedAmqpMessageException) m.getPayload();
                        CorrelationData correlationData = (CorrelationData) nme.getCorrelationData();
                        correlationData.getFuture().set(new Confirm(false, "Message was returned"));
                    }
                    else {
                        ((CorrelationData) m.getPayload()).getFuture().set(new Confirm(true, null));
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow retFlow() {
        return IntegrationFlows.from("returns")
                .handle(System.out::println)
                .get();
    }

    @Bean
    public CorrelationCreator correlationCreator() {
        return new CorrelationCreator(this.futures);
    }

    public static class CorrelationCreator {

        private final BlockingQueue<CorrelationData> futures;

        public CorrelationCreator(BlockingQueue<CorrelationData> futures) {
            this.futures = futures;
        }

        public CorrelationData create(Message<String> message) {
            CorrelationData data = new CorrelationData(message.getPayload());
            this.futures.add(data);
            return data;
        }

    }

    @Scheduled(fixedDelay = 5_000)
    public void checkForMissingAcks() {
        System.out.println("Checking pending acks");
        CorrelationData correlationData = this.futures.poll();
        while (correlationData != null) {
            try {
                if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                    if (correlationData.getReturnedMessage() == null
                            && !correlationData.getId().equals("Message was returned")) {
                        System.out.println("Ack received OK for " + correlationData.getId());
                    }
                    else {
                        System.out.println("Message returned for " + correlationData.getId());
                    }
                }
                else {
                    System.out.println("Nack received for " + correlationData.getId());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Interrupted");
            }
            catch (ExecutionException e) {
                System.out.println("Failed to get an ack " + e.getCause().getMessage());

            }
            catch (TimeoutException e) {
                System.out.println("Timed out waiting for ack for " + correlationData.getId());
            }
            correlationData = this.futures.poll();
        }
        System.out.println("No pending acks, exiting");
    }

}

推荐阅读