java - 网络故障后Spring amqp消费者未重新连接到队列
问题描述
我们有一个 spring 应用程序,它有一个动态队列侦听器连接到 rabbitmq 中的队列。假设我总共有 5 个侦听器使用者连接到从我的 spring 应用程序到 rabbitmq 的 5 个队列。
现在,如果每次发生网络波动/故障,我的 5 个连接队列中的第一个将停止重试 rabbitmq。
我通过 spring-amqp 类调试了代码,发现在创建与 rabbitmq 的连接时(发生网络故障时),它无法连接到它并抛出 org.springframework.amqp.AmqpIOException 重试函数中未处理的特定异常以便从重试队列列表中删除该队列。
我的主要课程:
@Slf4j
@SpringBootApplication(exclude = {ClientAutoConfiguration.class})
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.x.x.repositories")
@EntityScan(basePackages = "com.x.x.entities")
public class Main
{
@PostConstruct
void configuration()
{
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
}
/**
* The main method.
*
* @param args the arguments
*/
public static void main(String[] args)
{
ConfigurableApplicationContext context = SpringApplication.run(Main.class, args);
RabbitMQListenerUtil queueRegisterUtil = context.getBean(RabbitMQListenerUtil.class);
try
{
queueRegisterUtil.registerSpecifiedListenerForAllInstance();
}
catch (Exception e)
{
log.error(e.getMessage(), e);
}
}
}
用于创建 5 个消费者/侦听器的类
/**
* The Class RabbitMQListenerUtil.
*/
@Component
@Slf4j
public class RabbitMQListenerUtil
{
@Autowired
private ApplicationContext applicationContext;
public void registerSpecifiedListenerForAllInstance()
{
try
{
log.debug("New Listener has been register for instane name : ");
Thread.sleep(5000);
registerNewListener("temp1");
registerNewListener("temp2");
registerNewListener("temp3");
registerNewListener("temp4");
registerNewListener("temp5");
}
catch (Exception e)
{
}
}
/**
* This method will add new listener bean for given queue name at runtime
*
* @param queueName - Queue name
* @return Configurable application context
*/
public void registerNewListener(String queueName)
{
AnnotationConfigApplicationContext childAnnotaionConfigContext = new AnnotationConfigApplicationContext();
childAnnotaionConfigContext.setParent(applicationContext);
ConfigurableEnvironment environmentConfig = childAnnotaionConfigContext.getEnvironment();
Properties listenerProperties = new Properties();
listenerProperties.setProperty("queue.name", queueName + "_queue");
PropertiesPropertySource pps = new PropertiesPropertySource("props", listenerProperties);
environmentConfig.getPropertySources().addLast(pps);
childAnnotaionConfigContext.register(RabbitMQListenerConfig.class);
childAnnotaionConfigContext.refresh();
}
}
为队列消费者创建动态侦听器的类
/**
* The Class RabbitMQListenerConfig.
*/
@Configuration
@Slf4j
@EnableRabbit
public class RabbitMQListenerConfig
{
/** The Constant ALLOW_MESSAGE_REQUEUE. */
private static final boolean ALLOW_MESSAGE_REQUEUE = true;
/** The Constant MULTIPLE_MESSAGE_FALSE. */
private static final boolean MULTIPLE_MESSAGE_FALSE = false;
/**
* Listen.
*
* @param msg the msg
* @param channel the channel
* @param queue the queue
* @param deliveryTag the delivery tag
* @throws IOException Signals that an I/O exception has occurred.
*/
@RabbitListener(queues = "${queue.name}")
public void listen(Message msg, Channel channel, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException
{
int msgExecutionStatus = 0;
try
{
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info(message);
}
catch (Exception e)
{
log.error(e.toString());
log.error(e.getMessage(), e);
}
finally
{
ackMessage(channel, deliveryTag, msgExecutionStatus);
}
}
/**
* Ack message.
*
* @param channel the channel
* @param deliveryTag the delivery tag
* @param msgExecutionStatus the msg execution status
* @throws IOException Signals that an I/O exception has occurred.
*/
protected void ackMessage(Channel channel, long deliveryTag, int msgExecutionStatus) throws IOException
{
if (msgExecutionStatus == Constants.MESSAGE_DELETE_FOUND_EXCEPTION)
{
channel.basicNack(deliveryTag, MULTIPLE_MESSAGE_FALSE, ALLOW_MESSAGE_REQUEUE);
}
else
{
channel.basicAck(deliveryTag, MULTIPLE_MESSAGE_FALSE);
}
}
/**
* Bean will create from this with given name.
*
* @param name - Queue name-
* @return the queue
*/
@Bean
public Queue queue(@Value("${queue.name}") String name)
{
return new Queue(name);
}
/**
* RabbitAdmin Instance will be created which is required to create new Queue.
*
* @param cf - Connection factory
* @return the rabbit admin
*/
@Bean
public RabbitAdmin admin(ConnectionFactory cf)
{
return new RabbitAdmin(cf);
}
}
申请日志:
我已经对此进行了多次测试,每次我的第一个连接队列都被停止连接。
==========================更新1 ======================= ======
重新连接停止消费者的代码: https ://pastebin.com/VnUrhdLP
解决方案
Caused by: java.net.UnknownHostException: rabbitmqaind1.hqdev.india
你的网络有问题。
推荐阅读
- firebase - 如何在 Stack Navigation 中传递 React Native 中的导航道具?
- python - Azure 函数 - Pandas 数据帧到 Excel,写入 outputBlob 流
- java - 片段如何作为对话框显示在视图顶部
- transactions - 带有欧元货币的交易 API 订单对象
- postgresql - for条件和if条件组合怎么写?写代码**For循环**
- java - Spark java过滤器isin方法还是其他?
- spss - 组合变量以创建新变量
- javascript - 在 React Context 中使用与状态无关的全局对象
- php - 如何从数据块创建 json 文件
- javascript - 获取图像的宽度和高度。src 是代理