apache-kafka - Kafka 事务:在 AddPartitionsToTxnRequest 上接收 CONCURRENT_TRANSACTIONS
问题描述
我正在尝试在事务中发布 7 个代理上 16 个 Kafka 分区上的消息。
流程是这样的:
- 公开交易
- 将消息写入 16 个分区
- 提交事务
- 睡眠 25 毫秒
- 重复
有时事务需要超过 1 秒才能完成,平均为 50 毫秒。在生产者端启用跟踪日志记录后,我注意到以下错误:
TRACE internals.TransactionManager [kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1, transactionalId=cma-2]
Received transactional response AddPartitionsToTxnResponse(errors={modelapp-ecb-0=CONCURRENT_TRANSACTIONS, modelapp-ecb-9=CONCURRENT_TRANSACTIONS, modelapp-ecb-10=CONCURRENT_TRANSACTIONS, modelapp-ecb-11=CONCURRENT_TRANSACTIONS, modelapp-ecb-12=CONCURRENT_TRANSACTIONS, modelapp-ecb-13=CONCURRENT_TRANSACTIONS, modelapp-ecb-14=CONCURRENT_TRANSACTIONS, modelapp-ecb-15=CONCURRENT_TRANSACTIONS, modelapp-ecb-1=CONCURRENT_TRANSACTIONS, modelapp-ecb-2=CONCURRENT_TRANSACTIONS, modelapp-ecb-3=CONCURRENT_TRANSACTIONS, modelapp-ecb-4=CONCURRENT_TRANSACTIONS, modelapp-ecb-5=CONCURRENT_TRANSACTIONS, modelapp-ecb-6=CONCURRENT_TRANSACTIONS, modelapp-ecb-=CONCURRENT_TRANSACTIONS, modelapp-ecb-8=CONCURRENT_TRANSACTIONS}, throttleTimeMs=0)
for request (type=AddPartitionsToTxnRequest, transactionalId=cma-2, producerId=59003, producerEpoch=0, partitions=[modelapp-ecb-0, modelapp-ecb-9, modelapp-ecb-10, modelapp-ecb-11, modelapp-ecb-12, modelapp-ecb-13, modelapp-ecb-14, modelapp-ecb-15, modelapp-ecb-1, modelapp-ecb-2, modelapp-ecb-3, modelapp-ecb-4, modelapp-ecb-5, modelapp-ecb-6, modelapp-ecb-7, modelapp-ecb-8])
Kafka 生产者重试多次发送 AddPartitionsToTxnRequest(s) 直到成功,但这会导致延迟。
代码如下所示:
Properties producerProperties = PropertiesUtil.readPropertyFile(_producerPropertiesFile);
_producer = new KafkaProducer<>(producerProperties);
_producer.initTransactions();
_producerService = Executors.newSingleThreadExecutor(new NamedThreadFactory(getClass().getSimpleName()));
_producerService.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
_producer.beginTransaction();
for (int partition = 0; partition < _numberOfPartitions; partition++)
_producer.send(new ProducerRecord<>(_producerTopic, partition, KafkaRecordKeyFormatter.formatControlMessageKey(_messageNumber, token), EMPTY_BYTE_ARRAY));
_producer.commitTransaction();
_messageNumber++;
Thread.sleep(_timeBetweenProducedMessagesInMillis);
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException | UnsupportedVersionException e) {
closeProducer();
break;
} catch (KafkaException e) {
_producer.abortTransaction();
} catch (InterruptedException e) {...}
}
});
查看经纪人的代码,似乎有两种情况会引发此错误,但我不知道为什么会到达那里
object TransactionCoordinator {
...
def handleAddPartitionsToTransaction(...): Unit = {
...
if (txnMetadata.pendingTransitionInProgress) {
// return a retriable exception to let the client backoff and retry
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
Left(Errors.CONCURRENT_TRANSACTIONS)
}
...
}
...
}
提前感谢您的帮助!
后期编辑:
在代理上启用跟踪日志记录,我们能够看到代理在事务达到状态 CompleteCommit之前向生产者发送 END_TXN 响应。生产者能够启动一个新事务,当它仍在转换 PrepareCommit -> CompleteCommit 时被代理拒绝。
解决方案
推荐阅读
- angularjs - ADAL 登录和验证用户取决于 Microsoft 帐户登录/注销
- php - WP自动评论批准自定义帖子类型
- r - 使用 Shiny 模拟实时数据可视化
- facebook - 如何更新将图片发布到 Facebook 的权限
- vba - 使用 vba 在行中自定义选择
- mysql - MYSQL 事件显示错误“对不起,发生了意外错误!”
- vba - VBA Access - 将多条记录插入表中时遇到问题
- ios - Apple Pay iOS Swift 应用优惠券
- ruby - 如何从字符串中删除特殊字符和多个空格
- class - Java - 如何根据用户输入检查列表是否包含对象的名称?