首页 > 解决方案 > Kafka Producer API - onCompletion

问题描述

试图了解 Java 生产者 API。onCompletion 是什么意思?帮助我理解相同的。

参考: https ://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null) {
                          e.printStackTrace();
                       } else {
                          System.out.println("The offset of the record we just sent is: " + metadata.offset());
                       }
                   }
               });

标签: apache-kafkakafka-producer-api

解决方案


在方法

producer.send(ProducerRecord<K,V> record, new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {...}
  });

确认发送后调用回调。回调在后台 I/O 线程中执行,因此它应该很快(不要阻塞它)

默认情况下,发送是异步的,一旦记录存储在等待发送的记录缓冲区中,此方法将立即返回。这允许并行发送许多记录,而不会阻塞等待每个记录之后的响应。

Send 返回 RecordMetadata,它指定记录发送到的分区、分配给它的偏移量和时间戳。

这是一种异步方法,而您可以使用同步方式执行相同操作:

producer.send(record).get();


推荐阅读