首页 > 解决方案 > Kafka Producer:使用回调处理异步发送中的异常

问题描述

如果异步发送到 Kafka,我需要捕获异常。Kafka producer Api 自带一个函数 send(ProducerRecord 记录,Callback 回调)。但是当我针对以下两种情况进行测试时:

问题 :

卡夫卡警告图像

注意:我还使用 25 秒的 linger.ms 设置来批量发送我的记录。


public class ProducerDemo {

    static KafkaProducer<String, String> producer;

    public static void main(String[] args) throws IOException {

         final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");

        producer = new KafkaProducer<String, String>(properties);
        String topic = "first_topic";

        for (int i = 0; i < 5; i++) {
            String value = "hello world " + Integer.toString(i);
            String key = "id_" + Integer.toString(i);

            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);

              producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        //execute everytime a record is successfully sent or exception is thrown
                        if(e == null){
                           // No Exception
                        }else{
                            //Exception Handling
                        }
                    }
                });
        }
        producer.close();
    }

标签: apache-kafkakafka-producer-api

解决方案


您将收到关于不存在主题的警告,作为KafkaProducer. 如果您等待更长的时间(默认情况下应为 60 秒),最终将调用回调:这是我的片段: 在此处输入图像描述

因此,当出现问题并且异步发送不成功时,它最终将失败并出现失败的未来或/和带有异常的回调。如果您没有以事务方式运行它,则仍然可能意味着批处理中的某些消息已找到到达代理的方式,而其他消息则没有。如果您需要对发送到 Kafka 的每条消息的上游系统(如 http 摄取接口等)进行阻塞式确认,这肯定会是一个问题。做到这一点的唯一方法是阻止每条带有未来的消息get,如文档中所述: 在此处输入图像描述

总的来说,我注意到很多与 KafkaProducer 交付语义和保证相关的问题。绝对可以更好地记录它。

还有一件事,因为您提到了 linger.ms

请注意,即使 linger.ms=0,及时到达的记录通常也会一起批处理,因此无论 linger 配置如何,在重负载下都会发生批处理


推荐阅读