apache-kafka - 即使未调用生产者提交,Kafka 事务性生产者也会写入消息
问题描述
对我来说,似乎 kafka 事务生产者的行为就像一个普通的生产者,消息在主题上是可见的,因为每条消息都调用了 send 。也许我错过了一些基本的东西。我希望消息仅在调用生产者提交方法后才会出现在主题中。在我下面的代码中,produce.commitTransactions() 被注释掉了,但我仍然收到主题中的消息。感谢您的任何指示。
public static void main(String[] args) {
try {
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer-1");
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-1"); // set transaction id
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(producerConfig);
producer.initTransactions(); //initiate transactions
try {
producer.beginTransaction(); //begin transactions
for (Integer i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<String, String>("t_test", i.toString(), "value_" + i));
}
// producer.commitTransaction(); //commit
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
} catch (Exception e) {
System.out.println(e.toString());
}
}
解决方案
当谈到 Kafka 中的事务时,您需要考虑生产者/消费者对。正如您所观察到的,生产者本身只是生产数据并且要么提交事务要么不提交事务。
只有在与消费者交互时,您才能通过将 KafkaConsumer 配置isolation.level
设置为read_committed
(默认情况下设置为read_uncommitted
)来“完成”交易。此配置描述为:
isolation.level:控制如何读取以事务方式编写的消息。如果设置为 read_committed,consumer.poll() 将只返回已提交的事务消息。如果设置为 read_uncommitted'(默认值),consumer.poll() 将返回所有消息,甚至是已中止的事务消息。在任一模式下都将无条件返回非事务性消息。
推荐阅读
- maven - Maven 卡住下载 maven-default-http-blocker
- azure - 在 docker 映像中安装软件包时出错
- npm - 在 NPM 之后不推荐使用未处理的承诺拒绝
- javascript - $http 获取循环将响应数据附加在一起
- laravel - Laravel FakeID 问题缺少路由所需的参数
- python - 如何从输出中删除最后一个逗号
- r - 在R中附加来自多个Excel文件的多个工作表
- dart - 无法在 dart 中声明二维列表并向其输入元素
- python - 如何根据 tkinter 应用程序中的用户输入更改标签?
- python - 如何从 sympy 方程中提取多项式和非多项式部分?