首页 > 解决方案 > 如何识别哪些消息成功发布到 kafka 主题以及哪些消息失败

问题描述

将消息列表发布到 apache kafka。任何人都可以使用 kafka api 提供示例代码,展示如何识别哪些消息成功发布到主题以及哪些消息从响应中失败?(请注意,我在一个请求中将消息列表作为一批发送。)

标签: javaapache-kafkakafka-producer-api

解决方案


KafkaProducer.send()方法采用单个 ProducerRecord(消息)。

有两种方法可以检查集群是否成功接收到此消息:

  • 使用回调:send()可以将回调作为第二个参数

    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            // If Exception is null, the record was sent successfully
        }
    });
    
  • 使用未来:send()返回一个Future

     ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
     Future<RecordMetadata> future = producer.send(record);
     try {
         RecordMetadata rm = future.get();
         // The record was sent successfully
     } catch (ExecutionException e) {
         // The record failed
     }
    

推荐阅读