首页 > 解决方案 > 如何验证 sprng kafka producer 是否成功发送消息?

问题描述

我正在尝试测试 spring kafka(2.2.5.RELEASE),当生产者使用 kafkatemplate 发送消息时,我想知道该消息是否发送成功。基于此,我想更新该消息 ID 的数据库记录。处理这种情况的最佳做法是什么?

这是检查成功或失败的示例代码

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test_topic", key, userMsg);
    SendResult<String, String> result = null;
    try {
        result = future.get(10000, TimeUnit.MILLISECONDS);
        LOGGER.info("Send Result : {}", result);
        LOGGER.info("Saving entry in db");
        messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
    } catch (Exception e) {
        LOGGER.error("Error publishing message ", e);
        messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
    }

标签: spring-bootspring-kafka

解决方案


send()操作返回ListenableFuture<>异步完成的a。

如果要阻塞调用线程以获取结果,请使用

future.get(timeout, TimeUnit.MILLISECONDS);

推荐阅读