首页 > 解决方案 > spring kafka listener中消息的处理策略

问题描述

只是想确保消息是否以正确的方式处理。当侦听器收到消息时,它将始终由新线程处理(将处理器 bean 定义为原型)。这个实现正确吗?(我认为监听器不是线程安全的,因此使用了bean的原型范围来处理消息)

(输入:TestTopic- 5 个分区 - 1 个消费者)或(输入:TestTopic- 5 个分区 - 5 个消费者)

public class EventListener {

    @Autowired
    private EventProcessor eventProcessor;

    @KafkaListener(topics = "TestTopic", containerFactory = "kafkaListenerContainerFactory",
            autoStartup = "true")
    public void onMessage(
            @Payload List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
        eventProcessor.processAndAcknowledgeBatchMessages(consumerRecords, acknowledgment);
    }

}

//事件处理器

@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@NoArgsConstructor
@SuppressWarnings("unused")
public class EventProcessorImpl implements EventProcessor {

    @Autowired
    private KafkaProducerTemplate kafkaProducerTemplate;

    @Autowired
    private ObjectMapper localObjectMapper;

    @Autowired
    private Dao dao;

    public void processAndAcknowledgeBatchMessages(
            List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
        long start = System.currentTimeMillis();
        consumerRecords.forEach( consumerRecord -> {
            Event event = localObjectMapper.readValue(consumerRecord.value(), Event.class);
            dao.save(process(event));
        });
        acknowledgment.acknowledge();
    }
}

标签: spring-kafka

解决方案


不,这是不正确的;你不应该在另一个线程上执行;它将导致提交偏移和错误处理的问题。

此外,制作EventProcessorImpl原型 bean 也无济于事。这只是意味着每次引用 bean 时都会使用一个新实例。

因为它只@Autowired在初始化期间被引用一次。要为每个请求获取一个新实例,您需要每次都调用getBean()应用程序上下文。

最好让你的代码线程安全。

编辑

(至少)有几种方法可以处理原型范围中定义的非线程安全服务。

  1. 使用 ThreadLocal:
@SpringBootApplication
public class So68447863Application {

    public static void main(String[] args) {
        SpringApplication.run(So68447863Application.class, args);
    }

    private static final ThreadLocal<NotThreadSafeService> SERVICES = new ThreadLocal<>();

    @Autowired
    ApplicationContext context;

    @KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
    void listen(String in) {
        NotThreadSafeService service = SERVICES.get();
        if (service == null) {
            service = this.context.getBean(NotThreadSafeService.class);
            SERVICES.set(service);
        }
        service.process(in);
    }

    @EventListener
    void removeService(ConsumerStoppedEvent event) {
        System.out.println("Consumer stopped; removing TL");
        SERVICES.remove();
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    NotThreadSafeService service() {
        return new NotThreadSafeService();
    }

}

class NotThreadSafeService {

    void process(String msg) {
        System.out.println(msg + " processed by " + this);
    }

}
  1. 使用实例池。
@SpringBootApplication
public class So68447863Application {

    public static void main(String[] args) {
        SpringApplication.run(So68447863Application.class, args);
    }

    private static final BlockingQueue<NotThreadSafeService> SERVICES = new LinkedBlockingQueue<>();

    @Autowired
    ApplicationContext context;

    @KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
    void listen(String in) {
        NotThreadSafeService service = SERVICES.poll();
        if (service == null) {
            service = this.context.getBean(NotThreadSafeService.class);
        }
        try {
            service.process(in);
        }
        finally {
            SERVICES.add(service);
        }
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    NotThreadSafeService service() {
        return new NotThreadSafeService();
    }

}

class NotThreadSafeService {

    void process(String msg) {
        System.out.println(msg + " processed by " + this);
    }

}

推荐阅读