java - 单元测试spring-kafka消费者时如何在KafkaEmbedded中设置端口
问题描述
我在使用来自集群的消息的应用程序中使用spring-boot-starter-parent
version 1.5.0.RELEASE
、spring-kafka
version1.0.0.RELEASE
和spring-kafka-test
version 。我为我的消费者使用了一个单元测试,但由于代理端口是随机选择的,所以它失败了。有没有办法可以在不更改版本的情况下设置此代理属性?或者我应该使用哪些版本以免破坏任何东西?1.0.0.RELEASE
Kakfa 0.9
KafkaEmbedded
这是KafkaListener
and的代码KafkaConsumerTest
。
监听器.java
@Service
public class Listener {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
logger.info(msg);
latch.countDown();
ack.acknowledge();
}
public CountDownLatch getLatch() {
return latch;
}
}
KafkaConsumerTest.java(编辑)
@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
private static String TEST_TOPIC = "topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);
public KafkaTemplate<String, String> template;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
private Listener listener;
@Before
public void init(){
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
senderProps.put("key.serializer", StringSerializer.class);
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps);
template = new KafkaTemplate<>(producerFactory);
template.setDefaultTopic(TEST_TOPIC);
}
@Test
public void testConsume() throws Exception {
String record = "message";
template.sendDefault(TEST_TOPIC, record);
logger.debug("test-consume sent record {}", record);
listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(listener.getLatch().getCount(), 0);
}
}
解决方案
请使用 spring-kafka 1.3.9 和 boot 1.5;不再支持早期版本。当前启动 1.5.x 版本是 1.5.21。
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);
static {
embeddedKafka.setKafkaPorts(1234);
}
setKafkaPorts
从 1.3 开始可用。
但是,您在测试中正确使用了分配的随机端口
Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
要让 kafka 侦听器连接到嵌入式代理,您可以使用。
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
推荐阅读
- javascript - 即使在 React 中有 e.preventDefault 使用钩子,为什么 onClick 事件会触发两次?
- javascript - 如何使用 Promise 将正文添加到 JavaScript 中的 POST 请求?
- python - ImageDataGenerator keras - save_to_dir
- python - 如何在pygame中加载图像?
- swiftui - 适合文本内容的 ScrollView 中的两个文本
- python - 尝试使用 for 循环将数据插入数据帧
- ruby - 在Ruby中将字符串转换为骆驼大小写
- r - 如何从 JGR 控制台中的 JGR 和 Deducer 输出中删除 BOM(ÿþ)?
- python - 使用“with”创建的对象返回“无”
- javascript - 使用jquery为div分配随机滚动速度?