spring - 如何在Junit测试中等待spring jms监听线程完成执行
问题描述
我有一个使用 spring-JMS 的 spring boot 应用程序。有没有办法告诉测试方法等待 jms lister util 它完成执行而不在将要测试的实际代码中使用锁存器?
这是 JMS 侦听器代码:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import javax.jms.QueueSession;
@Component
public class MyListener {
@Autowired
MyProcessor myProcessor;
@JmsListener(destination = "myQueue", concurrency = "1-4")
private void onMessage(Message message, QueueSession session) {
myProcessor.processMessage(message, session);
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import javax.jms.QueueSession;
@Component
public class MyProcessor {
public void processMessage(Message msg, QueueSession session) {
//Here I have some code.
}
}
import org.apache.activemq.command.ActiveMQTextMessage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueSession;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@SpringBootTest
@ExtendWith(SpringExtension.class)
@ActiveProfiles("test")
public class IntegrationTest {
@Autowired
private JmsTemplate JmsTemplate;
@Test
public void myTest() throws JMSException {
Message message = new ActiveMQTextMessage();
jmsTemplate.send("myQueue", session -> message);
/*
Here I have some testing code. How can I tell the application
to not execute this testing code until all JMS lister threads
finish executing.
*/
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.util.SocketUtils;
import javax.jms.ConnectionFactory;
@EnableJms
@Configuration
@Profile("test")
public class JmsTestConfig {
public static final String BROKER_URL =
"tcp://localhost:" + SocketUtils.findAvailableTcpPort();
@Bean
public BrokerService brokerService() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.addConnector(BROKER_URL);
return brokerService;
}
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(BROKER_URL);
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
return jmsTemplate;
}
}
注意:是否适用于在不向实现代码(MyListener 和 MyProcessor)添加测试目的代码的情况下解决此问题。
解决方案
代理监听器并添加一个倒计时闩锁的建议;这是我最近为 KafkaListener 做的一个......
@Test
public void test() throws Exception {
this.template.send("so50214261", "foo");
assertThat(TestConfig.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(TestConfig.received.get()).isEqualTo("foo");
}
@Configuration
public static class TestConfig {
private static final AtomicReference<String> received = new AtomicReference<>();
private static final CountDownLatch latch = new CountDownLatch(1);
@Bean
public static MethodInterceptor interceptor() {
return invocation -> {
received.set((String) invocation.getArguments()[0]);
return invocation.proceed();
};
}
@Bean
public static BeanPostProcessor listenerAdvisor() {
return new ListenerWrapper(interceptor());
}
}
public static class ListenerWrapper implements BeanPostProcessor, Ordered {
private final MethodInterceptor interceptor;
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
public ListenerWrapper(MethodInterceptor interceptor) {
this.interceptor = interceptor;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof Listener) {
ProxyFactory pf = new ProxyFactory(bean);
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(this.interceptor);
advisor.addMethodName("listen");
pf.addAdvisor(advisor);
return pf.getProxy();
}
return bean;
}
}
(但您应该将 countDown 移到调用之后proceed()
)。
推荐阅读
- wordpress - 如何在woocommerce中的排序选项之前添加文本
- python - 在一张图中同时绘制两个横截面强度
- sql - 如果在 Postgresql 中使用 select 语句
- arrays - 如何使我的数组列在 numpy 上统一?
- javascript - 如何通过多个标签过滤 JSON 中的数组
- react-native - 还原 | 如何正确连接到 redux 商店
- c# - 如何解决 C# 中的 HRESULT: 0x80010001 (RPC_E_CALL_REJECTED):Excel 互操作?
- angular - 有没有办法在没有 HTML/用户输入的情况下从 Angular 的本地网络中检索 CSV 文件
- r - 将横截面数据的值映射到 R 中的纵向数据
- r - R:让胡须循环一个未知名称的列表