首页 > 解决方案 > Spring Cloud kafka Stream Binder transactionIdPrefix 性能命中

问题描述

我们有一个 kafka 流程,我们正在消费来自一个主题的消息,然后发生一些丰富,然后我们将消息发布到另一个主题。以下是事件

我正在使用 Spring cloud kafka binder 3.0.0-RELEASE 版本,一切正常。最近我们引入了幂等生产者并包含了 transactionIdPrefix 属性,我们观察到我们开始出现性能问题。下面是统计数据。

下面是我们的代码和配置。

@StreamListener("INPUT")
@SendTo("OUTPUT")
public void consumer(Message message){
Acknowledgement ack = messge.getHeaders().get(KafkaHeaders.ACKNOWLEDGEMENT,Acknowledgement.class))
try{
    String inputMessage = message.getPayload.toString();
    String enrichMessage = // Enrichment on inputMessage
    ack.acknowledgement()   
    return enrichMessage;
}catch( Exception exp){
    ack.acknowledgement();
    throw exp;  
}
}

配置是

  1. spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix=TX-
  2. spring.cloud.stream.kafka.binder.transaction.producer.configuration.ack=all
  3. spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=10
  4. spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
  5. spring.cloud.stream.kafka.bindings.input.consumer.concurrency=30
  6. spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
  7. spring.cloud.stream.kafka.bindings.input.consumer.dlqName=error.topic
  8. spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOnError=true
  9. spring.cloud.stream.kafka.bindings.input.consumer.maxAttempt=3
  10. spring.cloud.stream.kafka.binder.transaction.producer.configuration.enable.idempotence=true

我预计性能会受到一点影响,但这看起来是一个巨大的性能差异。有没有人遇到过这样的问题以及我们如何使用 transactionIdPrefix 属性提高性能的任何建议?

标签: spring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


交易总是很昂贵;特别是当每次读取只执行一次写入时。

使用功能模型(Consumer<Message>而不是@StreamListener),您可以启用批处理模式以接收List<Message>; 这应该有助于提高性能。

public Consumer<List<Message>> () {
    messages -> {...}
}

这样,交易将适用于整个列表。


推荐阅读