首页 > 解决方案 > 如何在 java 中优化 .kafka 生产者?

问题描述

@Configuration
public class KafkaConfiguration {
    @Value("${kafka.boot.server}")
    private String kafkaServer;

   @Bean    
   public KafkaTemplate<String,String> kafkaTemplate(){
       return new KafkaTemplate<>(producerConfig());}

   @Bean
   public ProducerFactory<String,String> producerConfig() {
       Map<String,Object> config= new HashMap<>();
       config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
       config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class );
       config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);          return new DefaultKafkaProducerFactory<>(config);
   }
}

卡夫卡的先决条件是什么?您对发布消息有何建议?还有哪些其他可能的方式?

标签: javaapache-kafkamicroservices

解决方案


  1. delivery.timeout.ms:如果短时间内出现大量事件是你的情况,这个值应该更高,因为当网络繁忙时,你的客户会抱怨NetworkException,增加它你可以看到更少NetworkException

了解什么是delivery.timeout.ms

https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer?source=post_page-----fa3910d9aa54------ ---------------#KIP-91ProvideIntuitiveUserTimeoutsinTheProducer-TestPlan

  1. acks:如果您不需要数据丢失。您必须将其设置为all. 默认值为 1,领导者会将记录写入其本地日志,但会在不等待所有追随者的完全确认的情况下做出响应。在这种情况下,如果领导者在确认记录后但在追随者复制它之前立即失败,那么记录将丢失。

  2. retries: 这取决于你的 kafka 客户端版本。现在默认重试次数是 Integer.Max,但是对于早期版本,您希望设置retries为更高的值,这样您的生产者就不会因为无法访问领导分区的一个简单异常而停止。

  3. Exactly-once:如果您的应用程序需要完全一次,您必须参考enable.idempotencetransactional.id

请注意,此处提到的配置应该可以enums在您的 java 客户端中找到相应的配置

生产者设置的进一步参考: https ://docs.confluent.io/current/installation/configuration/producer-configs.html


推荐阅读