首页 > 解决方案 > 在特定服务器 VPC 上向主题消费或生产数据时面临问题

问题描述

我正在尝试使用卡夫卡和弹簧靴制作制作人。

我尝试创建新应用程序来生成有关主题的消息并被其他应用程序使用。当 m 启动服务器主题时,最初并没有被识别。即将出现的错误如下所示:

2021-09-03 15:33:20.024 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=lims-public-helper] Error while fetching metadata with correlation id 9 : { sms.requests=UNKNOWN_TOPIC_OR_PARTITION} 
2021-09-03 15:33:20.026 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=lims-public-helper] The following subscribed topics are not assigned to any members: [ sms.requests]

我尝试使用其他服务器,它在相同的配置下工作得非常好,尝试了这台服务器,它给出了异常。

卡夫卡主题

$ kaf topics 
NAME PARTITIONS REPLICAS 
__consumer_offsets 50 3 
__trace 9 1 
sms.requests 3 1 
sms.status 1 3 
test 1 3 

消费者代码:

 public ConsumerFactory<String, OtpDTO> otpConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, limsGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonSerializer.TYPE_MAPPINGS, "otpDTO:com.lims.helper.dto.OtpDTO");
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.lims.helper.dto.OtpDTO");
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(OtpDTO.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OtpDTO> otpKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, OtpDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(otpConsumerFactory());
        return factory;
    }

消费者侦听器详细信息:

  @KafkaListener(topics = "${spring.kafka.topic.lims.sms.otp}", containerFactory = "otpKafkaListenerContainerFactory")
    public void otpTopicMessage(@Payload OtpDTO otpDTO) {
        log.info(String.format("--------#####  otp topic consumer: %s", otpDTO));
    }

主题的属性详细信息:

spring.kafka.topic.lims.sms.otp=sms.requests
spring.kafka.topic.lims.sms.status=sms.status

标签: apache-kafkakafka-consumer-apispring-kafka

解决方案


推荐阅读