java - Getting-Magic v1 在生成消息时不支持记录标头
问题描述
获取Magic v1不支持记录头,同时产生消息,在我的代码下面,
KafkaProducerConfig:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Event> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Event> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaConsumerConfig:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Value(value = "${kafka.consumer.enable.auto.commit}")
private String autoCommit;
@Value(value = "${kafka.consumer.auto.commit.interval.ms}")
private String autoCommitInterval;
@Value(value = "${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value(value = "${kafka.consumer.session.timeout.ms}")
private String sessionTimeout;
@Value(value = "${kafka.consumer.concurrency}")
private String concurrency;
@Value(value = "${kafka.consumer.pollTimeout}")
private String pollTimeout;
@Bean
public ConsumerFactory<String, Event> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(Event.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Event> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.parseInt(concurrency));
factory.getContainerProperties().setPollTimeout(Integer.parseInt(pollTimeout));
return factory;
}
}
KafkaTopicConfig:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${kafka.topicName}")
private String topicName;
@Value(value = "${kafka.topic.partitions}")
private String partitions;
@Value(value = "${kafka.topic.replicationFactor}")
private String replicationFactor;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic ClientTopic() {
return new NewTopic(topicName, Integer.parseInt(partitions), (short) Short.parseShort(replicationFactor));
}
}
产生消息:产生消息时出错
kafkaTemplate.send(topicName, event);
消费消息:
@KafkaListener(topics = "someTopicName", groupId = "somegroupId")
public void consume(Event event) {
// here Business logic
}
我正在使用的 Gradle 依赖项:
implementation ('org.springframework.kafka:spring-kafka')
implementation('com.fasterxml.jackson.core:jackson-databind:2.9.4')
我正在使用的 Spring Boot 版本:
springBootVersion = '2.0.3.RELEASE'
请让我知道我做错了什么
我曾尝试添加生产者工厂但没有工作,
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
解决方案
推荐阅读
- git - 如何从裸仓库中检索/恢复 GIT 存储库
- python - 使用 igraph 最短距离时缺少距离
- javascript - beforeunload事件之前的Angular8 IFrame
- windows - 复杂和双重复杂方法在visual studio c项目中不起作用
- java - 使用进度条和 MVVM 的最佳方法是什么
- javascript - 访问 Firebase 对象并将其映射到基于 Angular 的应用程序中的 UI
- javascript - 在响应中获取 JSON 值并将它们映射到带有键的状态对象?
- javascript - 无需调用 setState 即可自动响应状态变量更新
- java - 如何将最新的 TZ 更新安装到 JRE 1.6
- php - MySQL PHP,SELECT WHERE 条件是一个数组