首页 > 解决方案 > 即使未调用生产者提交,Kafka 事务性生产者也会写入消息

问题描述

对我来说,似乎 kafka 事务生产者的行为就像一个普通的生产者,消息在主题上是可见的,因为每条消息都调用了 send 。也许我错过了一些基本的东西。我希望消息仅在调用生产者提交方法后才会出现在主题中。在我下面的代码中,produce.commitTransactions() 被注释掉了,但我仍然收到主题中的消息。感谢您的任何指示。

  public static void main(String[] args) {
        try {
            Properties producerConfig = new Properties();
            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
            producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer-1");
            producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
            producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-1"); // set transaction id
            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

            Producer<String, String> producer = new KafkaProducer<>(producerConfig);

            producer.initTransactions(); //initiate transactions
            try {
                producer.beginTransaction(); //begin transactions
                for (Integer i = 0; i < 1000; i++) {
                    producer.send(new ProducerRecord<String, String>("t_test", i.toString(), "value_" + i));

                }
                // producer.commitTransaction(); //commit

            } catch (KafkaException e) {
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }

            producer.close();
        } catch (Exception e) {
            System.out.println(e.toString());
        }
    }

标签: apache-kafkatransactionskafka-consumer-apikafka-producer-api

解决方案


当谈到 Kafka 中的事务时,您需要考虑生产者/消费者对。正如您所观察到的,生产者本身只是生产数据并且要么提交事务要么不提交事务。

只有在与消费者交互时,您才能通过将 KafkaConsumer 配置isolation.level设置为read_committed(默认情况下设置为read_uncommitted)来“完成”交易。此配置描述为:

isolation.level:控制如何读取以事务方式编写的消息。如果设置为 read_committed,consumer.poll() 将只返回已提交的事务消息。如果设置为 read_uncommitted'(默认值),consumer.poll() 将返回所有消息,甚至是已中止的事务消息。在任一模式下都将无条件返回非事务性消息。


推荐阅读