首页 > 解决方案 > Kafka 事务:在 AddPartitionsToTxnRequest 上接收 CONCURRENT_TRANSACTIONS

问题描述

我正在尝试在事务中发布 7 个代理上 16 个 Kafka 分区上的消息。

流程是这样的:

  1. 公开交易
  2. 将消息写入 16 个分区
  3. 提交事务
  4. 睡眠 25 毫秒
  5. 重复

有时事务需要超过 1 秒才能完成,平均为 50 毫秒。在生产者端启用跟踪日志记录后,我注意到以下错误:

TRACE internals.TransactionManager [kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1, transactionalId=cma-2] 
Received transactional response AddPartitionsToTxnResponse(errors={modelapp-ecb-0=CONCURRENT_TRANSACTIONS, modelapp-ecb-9=CONCURRENT_TRANSACTIONS, modelapp-ecb-10=CONCURRENT_TRANSACTIONS, modelapp-ecb-11=CONCURRENT_TRANSACTIONS, modelapp-ecb-12=CONCURRENT_TRANSACTIONS, modelapp-ecb-13=CONCURRENT_TRANSACTIONS, modelapp-ecb-14=CONCURRENT_TRANSACTIONS, modelapp-ecb-15=CONCURRENT_TRANSACTIONS, modelapp-ecb-1=CONCURRENT_TRANSACTIONS, modelapp-ecb-2=CONCURRENT_TRANSACTIONS, modelapp-ecb-3=CONCURRENT_TRANSACTIONS, modelapp-ecb-4=CONCURRENT_TRANSACTIONS, modelapp-ecb-5=CONCURRENT_TRANSACTIONS, modelapp-ecb-6=CONCURRENT_TRANSACTIONS, modelapp-ecb-=CONCURRENT_TRANSACTIONS, modelapp-ecb-8=CONCURRENT_TRANSACTIONS}, throttleTimeMs=0) 
for request (type=AddPartitionsToTxnRequest, transactionalId=cma-2, producerId=59003, producerEpoch=0, partitions=[modelapp-ecb-0, modelapp-ecb-9, modelapp-ecb-10, modelapp-ecb-11, modelapp-ecb-12, modelapp-ecb-13, modelapp-ecb-14, modelapp-ecb-15, modelapp-ecb-1, modelapp-ecb-2, modelapp-ecb-3, modelapp-ecb-4, modelapp-ecb-5, modelapp-ecb-6, modelapp-ecb-7, modelapp-ecb-8])

Kafka 生产者重试多次发送 AddPartitionsToTxnRequest(s) 直到成功,但这会导致延迟。

代码如下所示:

Properties producerProperties = PropertiesUtil.readPropertyFile(_producerPropertiesFile);
_producer = new KafkaProducer<>(producerProperties);
_producer.initTransactions();

_producerService = Executors.newSingleThreadExecutor(new NamedThreadFactory(getClass().getSimpleName()));
_producerService.submit(() -> {
    while (!Thread.currentThread().isInterrupted()) {

        try {
            _producer.beginTransaction();
            for (int partition = 0; partition < _numberOfPartitions; partition++) 
                _producer.send(new ProducerRecord<>(_producerTopic, partition, KafkaRecordKeyFormatter.formatControlMessageKey(_messageNumber, token), EMPTY_BYTE_ARRAY));

            _producer.commitTransaction();
            _messageNumber++;
            Thread.sleep(_timeBetweenProducedMessagesInMillis);
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException | UnsupportedVersionException e) {
            closeProducer();
            break;
        } catch (KafkaException e) {
            _producer.abortTransaction();
        } catch (InterruptedException e) {...} 
    }
});

查看经纪人的代码,似乎有两种情况会引发此错误,但我不知道为什么会到达那里

object TransactionCoordinator {
...
    def handleAddPartitionsToTransaction(...): Unit = {
    ...
        if (txnMetadata.pendingTransitionInProgress) {
            // return a retriable exception to let the client backoff and retry
            Left(Errors.CONCURRENT_TRANSACTIONS)
        } else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
            Left(Errors.CONCURRENT_TRANSACTIONS)
        }
    ...
    }
...
}

提前感谢您的帮助!

后期编辑:

在代理上启用跟踪日志记录,我们能够看到代理在事务达到状态 CompleteCommit之前向生产者发送 END_TXN 响应。生产者能够启动一个新事务,当它仍在转换 PrepareCommit -> CompleteCommit 时被代理拒绝。

标签: apache-kafkakafka-transactions-api

解决方案


推荐阅读