spring-kafka - spring kafka listener中消息的处理策略
问题描述
只是想确保消息是否以正确的方式处理。当侦听器收到消息时,它将始终由新线程处理(将处理器 bean 定义为原型)。这个实现正确吗?(我认为监听器不是线程安全的,因此使用了bean的原型范围来处理消息)
(输入:TestTopic- 5 个分区 - 1 个消费者)或(输入:TestTopic- 5 个分区 - 5 个消费者)
public class EventListener {
@Autowired
private EventProcessor eventProcessor;
@KafkaListener(topics = "TestTopic", containerFactory = "kafkaListenerContainerFactory",
autoStartup = "true")
public void onMessage(
@Payload List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
eventProcessor.processAndAcknowledgeBatchMessages(consumerRecords, acknowledgment);
}
}
//事件处理器
@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@NoArgsConstructor
@SuppressWarnings("unused")
public class EventProcessorImpl implements EventProcessor {
@Autowired
private KafkaProducerTemplate kafkaProducerTemplate;
@Autowired
private ObjectMapper localObjectMapper;
@Autowired
private Dao dao;
public void processAndAcknowledgeBatchMessages(
List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
long start = System.currentTimeMillis();
consumerRecords.forEach( consumerRecord -> {
Event event = localObjectMapper.readValue(consumerRecord.value(), Event.class);
dao.save(process(event));
});
acknowledgment.acknowledge();
}
}
解决方案
不,这是不正确的;你不应该在另一个线程上执行;它将导致提交偏移和错误处理的问题。
此外,制作EventProcessorImpl
原型 bean 也无济于事。这只是意味着每次引用 bean 时都会使用一个新实例。
因为它只@Autowired
在初始化期间被引用一次。要为每个请求获取一个新实例,您需要每次都调用getBean()
应用程序上下文。
最好让你的代码线程安全。
编辑
(至少)有几种方法可以处理原型范围中定义的非线程安全服务。
- 使用 ThreadLocal:
@SpringBootApplication
public class So68447863Application {
public static void main(String[] args) {
SpringApplication.run(So68447863Application.class, args);
}
private static final ThreadLocal<NotThreadSafeService> SERVICES = new ThreadLocal<>();
@Autowired
ApplicationContext context;
@KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
void listen(String in) {
NotThreadSafeService service = SERVICES.get();
if (service == null) {
service = this.context.getBean(NotThreadSafeService.class);
SERVICES.set(service);
}
service.process(in);
}
@EventListener
void removeService(ConsumerStoppedEvent event) {
System.out.println("Consumer stopped; removing TL");
SERVICES.remove();
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
NotThreadSafeService service() {
return new NotThreadSafeService();
}
}
class NotThreadSafeService {
void process(String msg) {
System.out.println(msg + " processed by " + this);
}
}
- 使用实例池。
@SpringBootApplication
public class So68447863Application {
public static void main(String[] args) {
SpringApplication.run(So68447863Application.class, args);
}
private static final BlockingQueue<NotThreadSafeService> SERVICES = new LinkedBlockingQueue<>();
@Autowired
ApplicationContext context;
@KafkaListener(id = "so68447863", topics = "so68447863", concurrency = "5")
void listen(String in) {
NotThreadSafeService service = SERVICES.poll();
if (service == null) {
service = this.context.getBean(NotThreadSafeService.class);
}
try {
service.process(in);
}
finally {
SERVICES.add(service);
}
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so68447863").partitions(10).replicas(1).build();
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
NotThreadSafeService service() {
return new NotThreadSafeService();
}
}
class NotThreadSafeService {
void process(String msg) {
System.out.println(msg + " processed by " + this);
}
}
推荐阅读
- r - 如何在r中的nnfor包中获得mlp层神经元的权重?
- ruby-on-rails - 为什么 Heroku 上的 secret_key_base 为空白(Rails 5.2)
- linux - 使用脚本外壳获取文件中每行的最后两个字母
- notepad++ - 在记事本++中用正则表达式替换左侧文本
- .net-core - 是否有任何分析 api 可用于以 clr 版本托管的应用程序,因为 IIS 中没有托管代码
- kubernetes - 如何将运行状况检查放入部署清单中?
- python - 如何从类中的回调方法正确调用方法?
- reactjs - ESLint TableRow 未定义 - 在 .NET Core 项目中反应
- ruby - 涉及条件循环的方法对某些值意外返回 nil
- django - django.template.exceptions.TemplateDoesNotExist