首页 > 解决方案 > 多线程事务 kafka 生产者 - 我应该在关闭前刷新吗?

问题描述

让我们考虑多线程跨国kafka生产者。我应该flush()在关闭前制作吗?换句话说,事务性生产者在发送数据之前是否会批量缓冲数据?

标签: javaapache-kafkakafka-producer-api

解决方案


javadocs中所述

应用程序不需要为事务生产者调用 flush 方法,因为 commitTransaction() 将在执行提交之前刷新所有缓冲的记录

这在 javadoc 示例中得到了最好的说明

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
 producer.beginTransaction();
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
 producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
 // We can't recover from these exceptions, so our only option is to close the producer and exit.
 producer.close();
} catch (KafkaException e) {
 // For all other exceptions, just abort the transaction and try again.
 producer.abortTransaction();
}
producer.close();

简而言之,最好使用生产者的事务性 API(它们是阻塞的并且会在失败时抛出异常)。

此外,在多线程应用程序的情况下,您需要确保每个生产者只有一个打开的事务。如果您在事务期间遇到异常,您应该调用producer.abortTransaction()(也在示例中突出显示)以维护生产者事务能力的恰好一次语义。


推荐阅读