spring-cloud-stream - 如何使用 sink 和 @StreamListener 编写测试 Junit 测试以进行消息传递
问题描述
我有一个带有 kafka 绑定的 spring-cloud-stream 应用程序。我想创建一个测试 Junit。
我的班级定义如下:-
@EnableBinding(Sink.class)
@Slf4j
public class Messaging {
@Autowired
private RestTemplate restTemplate;
@Value("${messaging}")
private String url;
@Value("${messaging.prefix}")
private String messaging;
@StreamListener(Sink.INPUT)
public void handle(Message<String> request) {
log.info("Topic name ==> %s :", request.getPayload());
try {
String jsonString = request.getPayload().replace("\\", "").replace("\"{", "{").replace("}\"", "}");
JsonObject jsonObject = (JsonObject)jsonParser.parse(request.getPayload());
String urlRequest =url.concat(jsonObject.get("targetClass").getAsString()).concat(messaging);
HttpEntity<Object> entity = new HttpEntity<Object>(jsonString, getHeaderMap(request.getHeaders()));
ResponseEntity<String> response = restTemplate.postForEntity(urlRequest, entity, String.class);
} catch (ValidationException validationException) {
log.error("Error de validación: {}", validationException.getMessage());
} catch (Exception e) {
log.error("Error ", e.getMessage());
}
}
我的频道定义如下:-
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
这是我的应用程序属性,用于将您的频道绑定到同一个队列:
spring:
cloud:
bus:
destination:CONFIG
enabled: true
stream:
bindings:
input:
group: input_messaging
contentType: application/json
destination: CONFIG_Test1,CONFIG_Test2
这是我创建的测试,但它给出了这个错误: 原因:org.springframework.context.ApplicationContextException:由于缺少 ServletWebServerFactory bean,无法启动 ServletWebServerApplicationContext。
@RunWith(SpringRunner.class)
@SpringBootTest
@TestPropertySource("classpath:test.properties")
@EnableConfigurationProperties
@ContextConfiguration(classes = MessagingTestConfig.class)
public class MessagingListenerTest {
@Before
public void setup() {
}
@Test
@SuppressWarnings("unchecked")
public void testReturn() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(MessagingTest.class,
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.contentType=text/plain",
"--spring.cloud.stream.bindings.output.contentType=text/plain");
MessageCollector collector = context.getBean(MessageCollector.class);
Processor processor = context.getBean(Processor.class);
Sink inputChannel = context.getBean(Sink.class);
Message<String> request = MessageBuilder.withPayload("headers").setHeader("topicName", "topic-1").build();
inputChannel.input()
.send(request);
Message<String> message = (Message<String>) collector
.forChannel(processor.output()).poll(1, TimeUnit.SECONDS);
assertThat(message).isNotNull();
assertThat(message.getPayload()).contains("topicName");
context.close();
}
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public class TestProcessor {
@StreamListener(Sink.INPUT)
public Message<String> hundle(Message<String> messageHundle) {
return messageHundle;
}
}
}
我想知道是否有方法可以测试我的课程,怎么做,感谢您的帮助
这是使用最新版本的 Spring Cloud Stream 所做的更改
package com.common.messaging;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.client.RestTemplate;
import lombok.extern.slf4j.Slf4j;
@EnableIntegration
@Configuration
@TestPropertySource(locations="classpath:/msc-test.properties")
@Slf4j
@RunWith(SpringRunner.class)
@ActiveProfiles("test")
@ContextConfiguration(classes = MessagingListenerTestConfig.class)
@Import(TestChannelBinderConfiguration.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public class MessagingListenerTest {
@Autowired
private MessagingListener listener;
@Autowired
private InputDestination inputDestination;
@Autowired
private OutputDestination outputDestination;
@Mock
private RestTemplate restTemplate;
private static final String URL = "http://localhost:8080/";
@Before
public void setup() {
restTemplate = mock(RestTemplate.class);
ReflectionTestUtils.setField(listener, "restTemplate", restTemplate);
ResponseEntity<String> mockResponse = new ResponseEntity<>("{}", HttpStatus.ACCEPTED);
when(restTemplate.postForEntity(any(), any(), eq(String.class))).thenReturn(mockResponse);
}
@Test
public void testHundleMessage() {
String expectedUrl = URL;
Message<String> request = MessageBuilder.withPayload("headers").setHeader("topicName", "topic-1").build();
log.info("request Test :", request.getPayload());
//inputDestination.send(new GenericMessage<byte[]>(request.getPayload().getBytes(StandardCharsets.UTF_8)));
listener.handle(request);
//Verificar la recepción de los mensajes
assertThat(outputDestination.receive()).isNotNull();
assertThat(outputDestination.receive().getPayload().toString()).contains("topicName");
//Verificar la url del restTemplate
Mockito.verify(restTemplate, Mockito.times(1)).postForEntity(expectedUrl, any(), eq(String.class));
}
}
但是鉴于此错误,我不知道为什么
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at org.springframework.cloud.stream.binder.test.OutputDestination.receive(OutputDestination.java:59)
at org.springframework.cloud.stream.binder.test.OutputDestination.receive(OutputDestination.java:73)
at com.common.messaging.MessagingListenerTest.testHundleMessage(MessagingListenerTest.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
解决方案
我仍然有同样的问题
@EnableIntegration
@Configuration
@TestPropertySource(locations="classpath:/msc-test.properties")
@Slf4j
@RunWith(SpringRunner.class)
@ActiveProfiles("test")
@ContextConfiguration(classes = MessagingListenerTestConfig.class)
@Import(TestChannelBinderConfiguration.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@DirtiesContext
public class MessagingListenerTest {
@Autowired
private MessagingListener listener;
@Autowired
private InputDestination inputDestination;
@Autowired
private OutputDestination outputDestination;
@Mock
private RestTemplate restTemplate;
private static final String EXPECTED_URL = "http://localhost:11000/test/v2/verification/messaging";
@Before
public void setup() {
restTemplate = mock(RestTemplate.class);
ReflectionTestUtils.setField(listener, "restTemplate", restTemplate);
ResponseEntity<String> mockResponse = new ResponseEntity<>("{}", HttpStatus.ACCEPTED);
when(restTemplate.postForEntity(any(), any(), eq(String.class))).thenReturn(mockResponse);
}
@Test
public void testHundleMessage() {
JSONObject obj1 = new JSONObject()
.put("id", 1)
.put("targetClass", "/test/v2/verification");
Message<String> request = MessageBuilder.withPayload(obj1.toString()).build();
log.info("request Test : "+ request.getPayload());
inputDestination.send(new GenericMessage<byte[]>(request.getPayload().getBytes(StandardCharsets.UTF_8)));
listener.handle(request);
//Verificar la url del restTemplate
Mockito.verify(restTemplate, Mockito.times(1)).postForEntity(eq(EXPECTED_URL), any(), eq(String.class));
//Verificar la recepción de los mensajes
assertThat(outputDestination.receive()).isNotNull();
assertThat(outputDestination.receive().getPayload().toString()).contains("topicName");
}
}
就在这一行
inputDestination.send(new GenericMessage<byte[]>(request.getPayload().getBytes(StandardCharsets.UTF_8)));
这是错误Junit
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at org.springframework.cloud.stream.binder.test.AbstractDestination.getChannel(AbstractDestination.java:34)
at org.springframework.cloud.stream.binder.test.InputDestination.send(InputDestination.java:37)
at com.common.messaging.MessagingListenerTest.testHundleMessage(MessagingListenerTest.java:93)
和控制台错误
2020-07-14 11:29:16.850 INFO 25240 --- [ main] c.b.a.m.c.m.MessagingListenerTest : The following profiles are active: test
2020-07-14 11:29:18.171 INFO 25240 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-07-14 11:29:18.192 INFO 25240 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-07-14 11:29:18.212 INFO 25240 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-07-14 11:29:18.392 INFO 25240 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-07-14 11:29:18.429 INFO 25240 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-07-14 11:29:20.113 INFO 25240 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2020-07-14 11:29:20.356 INFO 25240 --- [ main] o.s.i.e.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-07-14 11:29:20.358 INFO 25240 --- [ main] o.s.i.c.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2020-07-14 11:29:20.361 INFO 25240 --- [ main] o.s.i.e.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2020-07-14 11:29:20.382 INFO 25240 --- [ main] c.b.a.m.c.m.MessagingListenerTest : Started MessagingListenerTest in 4.629 seconds (JVM running for 9.331)
2020-07-14 11:29:23.255 INFO 25240 --- [ main] c.b.a.m.c.m.MessagingListenerTest : request Test : {"targetClass":"/test/v2/verification","id":1}
2020-07-14 11:29:28.207 INFO 25240 --- [ main] o.s.i.e.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-07-14 11:29:28.207 INFO 25240 --- [ main] o.s.i.c.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2020-07-14 11:29:28.207 INFO 25240 --- [ main] o.s.i.e.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2020-07-14 11:29:28.208 INFO 25240 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
Picked up JAVA_TOOL_OPTIONS: -agentpath:"C:\windows\system32\Aternity\Java\JavaHookLoader.dll"="C:\ProgramData\Aternity\hooks"
可能是什么问题呢
推荐阅读
- php - WordPress:不在特定用户角色中时将主页和页面重定向到帐户页面
- javascript - 使用字母数字 ID 遍历 Firebase 子数组
- jquery - 在rails 5中单击浏览器的后退按钮后如何避免返回?
- ruby-on-rails - 比较记录以发现改进
- reactjs - ReactJS 和 Typescript - 子组件:在父状态更改时获取子道具并刷新子组件
- android - 如何将 Json 文件从 Assets/Resources/ 复制到 Android 设备的内部存储(持久数据)?[统一安卓]
- visual-studio-2013 - 使用 PDFCreator 从 edmx 设计器中导出 edmx 图
- blockchain - 在 web3.js 中出现关于 write tx 的错误
- javascript - 当我在浏览器选项卡之间移动时,setInterval 停止反应
- reactjs - 无法使用 React 将电子商务数据发送到 Google Analytics