首页 > 解决方案 > ConsumerAwareMessageListener 抛出 IllegalArgumentException

问题描述

我有一个用例,我想使用 Consumer 对象,因此,我正在使用这个

@Slf4j
@Service
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MyMessageListener extends AbstractConsumerSeekAware implements ConsumerAwareMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord, Consumer<?, ?> consumer) {
        
        consumer.pause(Collections.singleton(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())));
    }

}

但是每次我向主题发送消息时,都会收到 IllegalArgumentException,我认为这是因为 Consumer 对象而发生的。

这是我的侦听器容器 -

@Autowired
private MyMessageListener myMessageListener;

public KafkaMessageListenerContainer<String, String> createContainer(String topic) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
        KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        listenerContainer.getContainerProperties().setMessageListener(myMessageListener);
        listenerContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
        listenerContainer.setAutoStartup(false);
        return listenerContainer;
    }



 private Map<String, Object> consumerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.constants.getKafkaBootstrapAddress());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return props;
    }

为什么会这样?

编辑 1

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is java.lang.IllegalArgumentException: Unrecognized Type: [null]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2361) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2346) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2220) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2134) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2016) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1702) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.0.jar:2.7.0]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

标签: javaspringspring-bootapache-kafkaspring-kafka

解决方案


你用的是什么版本?能否提供一个小应用程序来重现它?我刚刚对其进行了测试,首先使用 Boot 的自动配置容器工厂,然后使用您的代码,两者都对我来说很好(最新的 Boot 2.4.5)。

@SpringBootApplication
public class So67301285Application {

    public static void main(String[] args) {
        SpringApplication.run(So67301285Application.class, args);
    }

    @Bean
    ConcurrentMessageListenerContainer<String, String> container(
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            ConsumerAwareMessageListener<String, String> listener) {

        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so67301285");
        container.getContainerProperties().setMessageListener(listener);
        container.getContainerProperties().setGroupId("so67301285");
        return container;
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67301285").partitions(1).replicas(1).build();
    }

    @Bean
    public KafkaMessageListenerContainer<String, String> createContainer(Listener listener) {
        ContainerProperties containerProperties = new ContainerProperties("so67301285");
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties());
        KafkaMessageListenerContainer<String, String> listenerContainer = new KafkaMessageListenerContainer<>(
                consumerFactory, containerProperties);
        listenerContainer.getContainerProperties().setMessageListener(listener);
        listenerContainer.getContainerProperties().setGroupId("so67301285-1");
        listenerContainer.setAutoStartup(true);
        return listenerContainer;
    }

    private Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

}

@Component
class Listener extends AbstractConsumerSeekAware
        implements ConsumerAwareMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
        System.out.println(data.value() + "@" + data.offset() + "from group: " + KafkaUtils.getConsumerGroupId());
    }

}

结果:

foo@10from group: so67301285
foo@10from group: so67301285-1

推荐阅读