首页 > 解决方案 > Spring-Kafka 生产者吞吐量远低于纯 java 生产者

问题描述

我正在使用 spring-kafka 2.2.8 并使用以下设置编写一个简单的异步生产者:

producer config key : acks  and value is : 1
producer config key : max.in.flight.requests.per.connection  and value is : 5
producer config key : compression.type  and value is : lz4
producer config key : linger.ms  and value is : 10
producer config key : batch.size  and value is : 64000
producer config key : bufferMemory  and value is : 67108864
producer config key : key.serializer  and value is : class org.apache.kafka.common.serialization.StringSerializer
producer config key : value.serializer  and value is : class io.confluent.kafka.serializers.KafkaAvroSerializer

我可以使用 apache java 客户端每秒产生多达 20k 条消息,但使用 spring-kafka,它每秒不会超过 3k 条消息

这里有什么问题?

标签: spring-kafka

解决方案


我认为没有显着差异;和你看到的完全不一样……

@SpringBootApplication
public class So63308954Application {

    private static final int COUNT = 1_000_000;


    public static void main(String[] args) {
        SpringApplication.run(So63308954Application.class, args);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so63308954-1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so63308954-2").partitions(1).replicas(1).build();
    }

    @Bean
    public SimpleAsyncTaskExecutor exec() {
        return new SimpleAsyncTaskExecutor();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, SimpleAsyncTaskExecutor exec) {
        return args -> {
            StopWatch watch = new StopWatch();
            Producer<String, String> producer = template.getProducerFactory().createProducer();
            String payload = new String(new byte[1024]);
            exec.submit(() -> {
                ProducerRecord<String, String> record = new ProducerRecord<>("so63308954-1", payload);
                Future<RecordMetadata> future1 = null;
                watch.start("native");
                for (int i = 0; i < COUNT; i++) {
                    future1 = producer.send(record, (metadata, exception) -> {
                    });
                }
                System.out.println(future1.get(10, TimeUnit.SECONDS));
                watch.stop();
                return null;
            }).get();
            template.getProducerFactory().reset();
            exec.submit(() -> {
                ProducerRecord<String, String> record = new ProducerRecord<>("so63308954-2", payload);
                ListenableFuture<SendResult<String, String>> future = null;
                template.send(record); // create a new producer outside of the timer
                watch.start("spring");
                for (int i = 0; i < COUNT; i++) {
                    future = template.send(record);
                }
                System.out.println(future.get(10, TimeUnit.SECONDS).getRecordMetadata());
                watch.stop();
                return null;
            }).get();
            System.out.println(watch.prettyPrint());
            TaskInfo[] taskInfo = watch.getTaskInfo();
            for (TaskInfo info : taskInfo) {
                System.out.println(info.getTaskName() + " rate: " + COUNT / info.getTimeSeconds() + " / sec");
            }
            producer.close();
        };
    }

}

结果:

StopWatch '': running time = 4454736954 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
2205227969  050%  native
2249508985  050%  spring

native rate: 453467.856410994 / sec
spring rate: 444541.4562325031 / sec

推荐阅读