spring-cloud-stream - Spring Cloud kafka Stream Binder transactionIdPrefix 性能命中
问题描述
我们有一个 kafka 流程,我们正在消费来自一个主题的消息,然后发生一些丰富,然后我们将消息发布到另一个主题。以下是事件
- 消费者 - 消费消息
- 丰富 - 丰富消费的消息
- 生产者 - 向其他主题发布丰富的消息
我正在使用 Spring cloud kafka binder 3.0.0-RELEASE 版本,一切正常。最近我们引入了幂等生产者并包含了 transactionIdPrefix 属性,我们观察到我们开始出现性能问题。下面是统计数据。
- 在 transactionIdPrefix 系统需要大约 20 秒来处理具有 30 个并发消费者的 10k 消息之前。即每秒 500 条消息。
- 在 transactionIdPrefix 系统为 10k 条消息和 30 个并发消费者平均花费 165 秒之后。即每秒 60 条消息。
下面是我们的代码和配置。
@StreamListener("INPUT")
@SendTo("OUTPUT")
public void consumer(Message message){
Acknowledgement ack = messge.getHeaders().get(KafkaHeaders.ACKNOWLEDGEMENT,Acknowledgement.class))
try{
String inputMessage = message.getPayload.toString();
String enrichMessage = // Enrichment on inputMessage
ack.acknowledgement()
return enrichMessage;
}catch( Exception exp){
ack.acknowledgement();
throw exp;
}
}
配置是
- spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix=TX-
- spring.cloud.stream.kafka.binder.transaction.producer.configuration.ack=all
- spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=10
- spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
- spring.cloud.stream.kafka.bindings.input.consumer.concurrency=30
- spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
- spring.cloud.stream.kafka.bindings.input.consumer.dlqName=error.topic
- spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOnError=true
- spring.cloud.stream.kafka.bindings.input.consumer.maxAttempt=3
- spring.cloud.stream.kafka.binder.transaction.producer.configuration.enable.idempotence=true
我预计性能会受到一点影响,但这看起来是一个巨大的性能差异。有没有人遇到过这样的问题以及我们如何使用 transactionIdPrefix 属性提高性能的任何建议?
解决方案
交易总是很昂贵;特别是当每次读取只执行一次写入时。
使用功能模型(Consumer<Message>
而不是@StreamListener
),您可以启用批处理模式以接收List<Message>
; 这应该有助于提高性能。
public Consumer<List<Message>> () {
messages -> {...}
}
这样,交易将适用于整个列表。
推荐阅读
- java - OO:容器中是否包含自行车或椅子?
- raku - 找出容器是类还是对象
- c++ - 从 explorer.exe 读取 PCACLIENT 内存
- java - 是否可以在非 Android 应用程序中使用 Android 支持库?
- python - 如何以 Big Endian 方式读取和解析二进制文件
- ember.js - EMBER JS - 仅在需要时从后端获取关联的模型数据
- python - 迭代时未在熊猫数据框中设置值
- javascript - Node js - Push() 函数在参考和 mongodb 中无法正常工作
- android - 如何从 Android Studio 运行单个 Kotlin 类
- javascript - 如何向 window.addEventListener 添加时间