java - 如何在 java 中优化 .kafka 生产者?
问题描述
@Configuration
public class KafkaConfiguration {
@Value("${kafka.boot.server}")
private String kafkaServer;
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<>(producerConfig());}
@Bean
public ProducerFactory<String,String> producerConfig() {
Map<String,Object> config= new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class );
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class); return new DefaultKafkaProducerFactory<>(config);
}
}
卡夫卡的先决条件是什么?您对发布消息有何建议?还有哪些其他可能的方式?
解决方案
delivery.timeout.ms
:如果短时间内出现大量事件是你的情况,这个值应该更高,因为当网络繁忙时,你的客户会抱怨NetworkException
,增加它你可以看到更少NetworkException
。
了解什么是delivery.timeout.ms
:
acks
:如果您不需要数据丢失。您必须将其设置为all
. 默认值为 1,领导者会将记录写入其本地日志,但会在不等待所有追随者的完全确认的情况下做出响应。在这种情况下,如果领导者在确认记录后但在追随者复制它之前立即失败,那么记录将丢失。retries
: 这取决于你的 kafka 客户端版本。现在默认重试次数是 Integer.Max,但是对于早期版本,您希望设置retries
为更高的值,这样您的生产者就不会因为无法访问领导分区的一个简单异常而停止。Exactly-once:如果您的应用程序需要完全一次,您必须参考
enable.idempotence
和transactional.id
请注意,此处提到的配置应该可以enums
在您的 java 客户端中找到相应的配置
生产者设置的进一步参考: https ://docs.confluent.io/current/installation/configuration/producer-configs.html
推荐阅读
- java - 当我从命令行运行 Main 时,Java 在同一个包中找不到类
- python - Python将列表转换为列表列表
- javascript - 表单输入字段必填属性在 Google 应用脚本 HTML Web 应用中不起作用
- python - 如何找到我在 PyCharm 中的 Python 应用程序的入口点?
- android - E/RecyclerView:没有附加适配器;使用 Firebase 数据创建回收站视图时跳过布局
- excel - 数组到范围验证列表
- reactjs - 在 React Native 中将子组件有条件地渲染到 Header 中
- sql - T-SQL Pivot 行为问题,Pivot 的行为与 CAS 编号不符
- visual-studio-code - 文件在当前文件夹中关闭并显示在工作区文件夹中
- python - 模块“networkx”没有属性“from_pandas_edgelist”