spring-boot - 如何验证 sprng kafka producer 是否成功发送消息?
问题描述
我正在尝试测试 spring kafka(2.2.5.RELEASE),当生产者使用 kafkatemplate 发送消息时,我想知道该消息是否发送成功。基于此,我想更新该消息 ID 的数据库记录。处理这种情况的最佳做法是什么?
这是检查成功或失败的示例代码
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test_topic", key, userMsg);
SendResult<String, String> result = null;
try {
result = future.get(10000, TimeUnit.MILLISECONDS);
LOGGER.info("Send Result : {}", result);
LOGGER.info("Saving entry in db");
messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
} catch (Exception e) {
LOGGER.error("Error publishing message ", e);
messageRepo.save(result.getProducerRecord().key().toString(),result.getProducerRecord().value().toString());
}
解决方案
send()
操作返回ListenableFuture<>
异步完成的a。
如果要阻塞调用线程以获取结果,请使用
future.get(timeout, TimeUnit.MILLISECONDS);
推荐阅读
- python - pypdf - 在 pdf 文件中写入时,孟加拉语单词被破坏
- react-native - 如何在 React Native(Android)中正确地将元素聚焦在“位置:”绝对“”视图中
- c - 为什么我的从数组计算平均值的代码不起作用?
- flutter - 从 sql 检索的布尔值作为字符串
- pandas - 创建具有不等长度列表的熊猫数据框
- sql - 如何从通过“路径”列存储的树层次结构中过滤出具有某些继承标志的子树?
- python - 如何修复 TypeError:“NoneType”对象不可迭代
- database-design - 用于在通知源上存储通用过滤器的数据库设计
- javascript - MUI 自动完成自定义选项,不显示所选选项
- javascript - 本地静音视频时,Agora 远程视图发生变化