apache-kafka - Kafka Producer:使用回调处理异步发送中的异常
问题描述
如果异步发送到 Kafka,我需要捕获异常。Kafka producer Api 自带一个函数 send(ProducerRecord 记录,Callback 回调)。但是当我针对以下两种情况进行测试时:
- 卡夫卡经纪人倒闭
- 未预先创建主题 未调用回调。相反,我在代码中收到发送失败的警告(如下所示)。
问题 :
那么回调是否仅针对特定异常调用?
Kafka 客户端何时尝试在异步发送时连接到 Kafka 代理:在每批发送时还是定期发送?
注意:我还使用 25 秒的 linger.ms 设置来批量发送我的记录。
public class ProducerDemo {
static KafkaProducer<String, String> producer;
public static void main(String[] args) throws IOException {
final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");
producer = new KafkaProducer<String, String>(properties);
String topic = "first_topic";
for (int i = 0; i < 5; i++) {
String value = "hello world " + Integer.toString(i);
String key = "id_" + Integer.toString(i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//execute everytime a record is successfully sent or exception is thrown
if(e == null){
// No Exception
}else{
//Exception Handling
}
}
});
}
producer.close();
}
解决方案
您将收到关于不存在主题的警告,作为KafkaProducer
. 如果您等待更长的时间(默认情况下应为 60 秒),最终将调用回调:这是我的片段:
因此,当出现问题并且异步发送不成功时,它最终将失败并出现失败的未来或/和带有异常的回调。如果您没有以事务方式运行它,则仍然可能意味着批处理中的某些消息已找到到达代理的方式,而其他消息则没有。如果您需要对发送到 Kafka 的每条消息的上游系统(如 http 摄取接口等)进行阻塞式确认,这肯定会是一个问题。做到这一点的唯一方法是阻止每条带有未来的消息get
,如文档中所述:
总的来说,我注意到很多与 KafkaProducer 交付语义和保证相关的问题。绝对可以更好地记录它。
还有一件事,因为您提到了 linger.ms:
请注意,即使 linger.ms=0,及时到达的记录通常也会一起批处理,因此无论 linger 配置如何,在重负载下都会发生批处理
推荐阅读
- php - 根据数组的索引显示不同的 HTML 元素
- c++ - 函数内的数组
- javascript - 获取所选文件夹的本地路径以发送服务器?
- java - 学习构造函数 - 无输出
- python - PyQt:使用隐藏和显示后取消堆叠按钮
- php - 检查输入类型文件multiple在php中是否为空
- javascript - HTML Javascript Automatic Window Scroll
- javascript - vuejs - 当我保存文件时,npm run dev webserver 自动刷新工作,但刷新浏览器没有
- java - How to tell if Serial COM is USB or Bluetooth in Processing?
- haskell - 如何明确指定中间变量的类型?