apache-kafka - 在特定服务器 VPC 上向主题消费或生产数据时面临问题
问题描述
我正在尝试使用卡夫卡和弹簧靴制作制作人。
我尝试创建新应用程序来生成有关主题的消息并被其他应用程序使用。当 m 启动服务器主题时,最初并没有被识别。即将出现的错误如下所示:
2021-09-03 15:33:20.024 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=lims-public-helper] Error while fetching metadata with correlation id 9 : { sms.requests=UNKNOWN_TOPIC_OR_PARTITION}
2021-09-03 15:33:20.026 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=lims-public-helper] The following subscribed topics are not assigned to any members: [ sms.requests]
我尝试使用其他服务器,它在相同的配置下工作得非常好,尝试了这台服务器,它给出了异常。
卡夫卡主题
$ kaf topics
NAME PARTITIONS REPLICAS
__consumer_offsets 50 3
__trace 9 1
sms.requests 3 1
sms.status 1 3
test 1 3
消费者代码:
public ConsumerFactory<String, OtpDTO> otpConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, limsGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "otpDTO:com.lims.helper.dto.OtpDTO");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.lims.helper.dto.OtpDTO");
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(OtpDTO.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OtpDTO> otpKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OtpDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(otpConsumerFactory());
return factory;
}
消费者侦听器详细信息:
@KafkaListener(topics = "${spring.kafka.topic.lims.sms.otp}", containerFactory = "otpKafkaListenerContainerFactory")
public void otpTopicMessage(@Payload OtpDTO otpDTO) {
log.info(String.format("--------##### otp topic consumer: %s", otpDTO));
}
主题的属性详细信息:
spring.kafka.topic.lims.sms.otp=sms.requests
spring.kafka.topic.lims.sms.status=sms.status
解决方案
推荐阅读
- sql - 确定具有多个设备的表中设备的某些状态的持续时间
- java - 使用 RESTful Web 服务:控制器 Springboot 错误
- apache - .htaccess - 一个特定域的 SSL redicect 条件不起作用
- vue.js - 在 Vue 中,如何使用新的 devBaseUrl 选项?
- java - ClassName.class.getResource() 返回 null
- html - 展开 div 并覆盖其他 div
- jmeter - CSV 数据集在 JDBC 采样器请求之前执行
- mysql - 基于具有自连接的另一列值按值排序
- google-cloud-platform - 以编程方式编辑 Dataprep 配方
- react-native - expo 不包含名为 WebBrowser 的导出