java - Spring集成-发布者确认超时?
问题描述
这是我目前的设置:
queue1 和 queue2 与流向 channel1 的集成流一起标记:
@Bean
public IntegrationFlow q1f() {
return IntegrationFlows
.from(queue1InboundAdapter())
...
.channel(amqpInputChannel())
.get();
}
@Bean
public IntegrationFlow q2f() {
return IntegrationFlows
.from(queue2InboundAdapter())
...
.channel(amqpInputChannel())
.get();
}
然后,一切都被聚合,然后在聚合消息被rabbitmq确认后确认:
@Bean
public IntegrationFlow aggregatingFlow() {
return IntegrationFlows
.from(amqpInputChannel())
.aggregate(...
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeout(TimeUnit.SECONDS.toMillis(10))
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
)
.handle(amqpOutboundEndpoint())
.get();
}
@Bean
public AmqpOutboundEndpoint amqpOutboundEndpoint() {
AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
outboundEndpoint.setConfirmAckChannel(manualAckChannel());
outboundEndpoint.setConfirmCorrelationExpressionString("#root");
outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
return outboundEndpoint;
}
ackTemplate()
用 cf 设置springFactory.setPublisherConfirms(true);
。
我看到的问题是,每 10 天一次,有一些消息卡unacknowledged
在 rabbitmq 中的状态。
我的猜测是,消息的发布以某种方式等待兔子做,PUBLISHER CONFIRMS
但它永远不会得到它并超时?在这种情况下,我从不 ACK 中的消息queue1
。这可能吗?
因此,只需再完成一次工作流程:
[两个队列->直接通道->聚合器(保留通道和标记值)->发布到兔子->兔子通过发布者确认返回ACK->spring确认通道上的所有消息+它为聚合消息保存在内存中的值]
我也有我的聚合器实现(因为我需要手动确认来自 q1 和 q2 的消息):
public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
private AckingState ackingState;
public AbstractManualAckAggregatingMessageGroupProcessor(AckingState ackingState){
this.ackingState = ackingState;
}
@Override
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
List<ManualAckPair> manualAckPairs = new ArrayList<>();
group.getMessages().forEach(m -> {
Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
manualAckPairs.add(new ManualAckPair(channel, deliveryTag, ackingState));
});
aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
return aggregatedHeaders;
}
}
更新
这是 rabbit admin 的样子(很长一段时间内有 2 条未确认的消息,并且在重新启动之前不会被 ACKED - 当它重新传递时):
解决方案
在 Spring AMQP 2.1 版(Spring Integration 5.1)中,我们添加了一个Future<?>
并返回消息CorrelationData
来协助处理这种事情。如果您使用的是旧版本,则可以子类CorrelationData
化(并且您必须在代码中处理设置未来和返回的消息)。
这与计划任务一起,可以检测丢失的确认...
@SpringBootApplication
@EnableScheduling
public class Igh2755Application {
public static void main(String[] args) {
SpringApplication.run(Igh2755Application.class, args);
}
private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
SuccessCallback<? super Confirm> successCallback = confirm -> {
System.out.println((confirm.isAck() ? "A" : "Na") + "ck received");
};
FailureCallback failureCallback = throwable -> {
System.out.println(throwable.getMessage());
};
// Good - ack
CorrelationData correlationData = new CorrelationData("good");
correlationData.getFuture().addCallback(successCallback, failureCallback);
this.futures.put(correlationData);
template.convertAndSend("", "foo", "data", correlationData);
// Missing exchange nack, no return
correlationData = new CorrelationData("missing exchange");
correlationData.getFuture().addCallback(successCallback, failureCallback);
this.futures.put(correlationData);
template.convertAndSend("missing exchange", "foo", "data", correlationData);
// Missing queue ack, with return
correlationData = new CorrelationData("missing queue");
correlationData.getFuture().addCallback(successCallback, failureCallback);
this.futures.put(correlationData);
template.convertAndSend("", "missing queue", "data", correlationData);
};
}
@Scheduled(fixedDelay = 5_000)
public void checkForMissingAcks() {
System.out.println("Checking pending acks");
CorrelationData correlationData = this.futures.poll();
while (correlationData != null) {
try {
if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
if (correlationData.getReturnedMessage() == null) {
System.out.println("Ack received OK for " + correlationData.getId());
}
else {
System.out.println("Message returned for " + correlationData.getId());
}
}
else {
System.out.println("Nack received for " + correlationData.getId());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Interrupted");
}
catch (ExecutionException e) {
System.out.println("Failed to get an ack " + e.getCause().getMessage());
}
catch (TimeoutException e) {
System.out.println("Timed out waiting for ack for " + correlationData.getId());
}
correlationData = this.futures.poll();
}
System.out.println("No pending acks, exiting");
}
}
.
Checking pending acks
Ack received OK for good
Nack received for missing exchange
Message returned for missing queue
No pending acks, exiting
使用 Spring IntegrationconfirmCorrelationExpression
可以使用它来创建CorrelationData
实例。
编辑
使用 Spring 集成...
@SpringBootApplication
@EnableScheduling
public class Igh2755Application {
public static void main(String[] args) {
SpringApplication.run(Igh2755Application.class, args);
}
private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();
public interface Gate {
void send(@Header("exchange") String exchange, @Header("rk") String rk, String payload);
}
@Bean
@DependsOn("flow")
public ApplicationRunner runner(Gate gate) {
return args -> {
gate.send("", "foo", "good");
gate.send("junque", "rk", "missing exchange");
gate.send("", "junque", "missing queue");
};
}
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gate.class)
.handle(Amqp.outboundAdapter(template)
.confirmCorrelationExpression("@correlationCreator.create(#root)")
.exchangeNameExpression("headers.exchange")
.routingKeyExpression("headers.rk")
.returnChannel(returns())
.confirmAckChannel(acks())
.confirmNackChannel(acks()))
.get();
}
@Bean
public MessageChannel acks() {
return new DirectChannel();
}
@Bean
public MessageChannel returns() {
return new DirectChannel();
}
@Bean
public IntegrationFlow ackFlow() {
return IntegrationFlows.from("acks")
/*
* Work around a bug because the correlation data is wrapped and so the
* wrong future is completed.
*/
.handle(m -> {
System.out.println(m);
if (m instanceof ErrorMessage) { // NACK
NackedAmqpMessageException nme = (NackedAmqpMessageException) m.getPayload();
CorrelationData correlationData = (CorrelationData) nme.getCorrelationData();
correlationData.getFuture().set(new Confirm(false, "Message was returned"));
}
else {
((CorrelationData) m.getPayload()).getFuture().set(new Confirm(true, null));
}
})
.get();
}
@Bean
public IntegrationFlow retFlow() {
return IntegrationFlows.from("returns")
.handle(System.out::println)
.get();
}
@Bean
public CorrelationCreator correlationCreator() {
return new CorrelationCreator(this.futures);
}
public static class CorrelationCreator {
private final BlockingQueue<CorrelationData> futures;
public CorrelationCreator(BlockingQueue<CorrelationData> futures) {
this.futures = futures;
}
public CorrelationData create(Message<String> message) {
CorrelationData data = new CorrelationData(message.getPayload());
this.futures.add(data);
return data;
}
}
@Scheduled(fixedDelay = 5_000)
public void checkForMissingAcks() {
System.out.println("Checking pending acks");
CorrelationData correlationData = this.futures.poll();
while (correlationData != null) {
try {
if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
if (correlationData.getReturnedMessage() == null
&& !correlationData.getId().equals("Message was returned")) {
System.out.println("Ack received OK for " + correlationData.getId());
}
else {
System.out.println("Message returned for " + correlationData.getId());
}
}
else {
System.out.println("Nack received for " + correlationData.getId());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Interrupted");
}
catch (ExecutionException e) {
System.out.println("Failed to get an ack " + e.getCause().getMessage());
}
catch (TimeoutException e) {
System.out.println("Timed out waiting for ack for " + correlationData.getId());
}
correlationData = this.futures.poll();
}
System.out.println("No pending acks, exiting");
}
}
推荐阅读
- java - Highchart WebView 抛出 IllegalStateException
- javascript - 我在使用 firebase 进行注册时遇到了一个问题,它在 react js 中被调用了 3 次
- sql - 引用要替换的增量模型分区 - dbt v0.17.2
- node.js - Heroku 的 package.json 文件的最少代码行数是多少?
- node.js - 创建查询以在后端或前端返回某些结果更好吗?节点与 Redux
- python - 如何重新训练 DNN,保持隐藏层中的权重在 Tensorflow 2.0 中冻结
- d3.js - 使 d3 图表反应 (v5)
- google-chrome-extension - 如何将清单版本 2 迁移到 chrome 扩展的 v3?
- python - 如何在没有 internet/python/pip/etc 的情况下实现人类可读的 mitmdump 输出文件?
- apache-spark - ORC 与 Parquet 文件格式