java - 多线程事务 kafka 生产者 - 我应该在关闭前刷新吗?
问题描述
让我们考虑多线程跨国kafka生产者。我应该flush()
在关闭前制作吗?换句话说,事务性生产者在发送数据之前是否会批量缓冲数据?
解决方案
应用程序不需要为事务生产者调用 flush 方法,因为 commitTransaction() 将在执行提交之前刷新所有缓冲的记录
这在 javadoc 示例中得到了最好的说明
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
简而言之,最好使用生产者的事务性 API(它们是阻塞的并且会在失败时抛出异常)。
此外,在多线程应用程序的情况下,您需要确保每个生产者只有一个打开的事务。如果您在事务期间遇到异常,您应该调用producer.abortTransaction()
(也在示例中突出显示)以维护生产者事务能力的恰好一次语义。
推荐阅读
- javascript - 对于 ng-keyup 无法执行 event.preventDefault() 和 ng-keypress 落后于 angular.js 中输入类型编号的值
- django - NoReverseMatch at /products/drama/ - 未找到“产品”的反向。“产品”不是有效的视图函数或模式名称
- apache-spark - 如何在 Spark 中检查和优化闭包
- html - 如何将侧面板高度同步到动态高度?
- r - 如何按R中的多列对数据框的一部分进行排序?
- mysql - 在sql中命名输出
- firebase - 每次使用 Flutter/Firebase 按下上传时,如何创建唯一的图像 ID?
- php - PHPUnit:使用注解返回特定类型真的有必要吗?
- php - Laravel 试用模式中间件
- php - PHP utf8_decode 效果很好,除非保存在 MySQL 数据库中