java - 如何在生产者之前启动kafka消费者
问题描述
我有一组与 kafka 主题交互的微服务。其中一个微服务应该使用两个整数,然后将它们相加并发送到主题。问题是我无法配置微服务,以便在生产者之前启动消费者。我的代码如下:
@SpringBootApplication
public class AdderApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(AdderApplication.class, args);
AdderConsumer consumer = context.getBean(AdderConsumer.class);
AdderProducer producer = context.getBean(AdderProducer.class);
producer.sumTwoIntegers();
}
@Component
public class AdderConsumer extends Controller {
private CountDownLatch latch = new CountDownLatch(3);
@KafkaListener(topics = "${kafka.topic.name}")
public void listenToPartitionWithOffset(@Payload Integer message) {
if (message != null) {
list.add(message);
isProduce = true;
System.out.println(list);
}
}
@Component
public class AdderProducer extends Controller {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic.name}")
private String topicName;
public void sumTwoIntegers() {
// logic
}
private void sendMessage(String message) {
// logic
}
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.boot.server}")
private String kafkaServer;
@Value("${kafka.consumer.group.id}")
private String kafkaGroupId;
@Bean
public LoggingErrorHandler errorHandler(){
return new LoggingErrorHandler();
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerConfig());
}
@Bean
public ProducerFactory<String, String> producerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<String, Integer> consumerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Integer>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Integer> listener = new ConcurrentKafkaListenerContainerFactory<>();
listener.setConsumerFactory(consumerConfig());
listener.setErrorHandler(errorHandler());
return listener;
}
我调试了代码,它同时调用了生产者和侦听器,但是我需要侦听器首先接收两个整数,然后才调用生产者方法。
我会欣赏你的想法。
解决方案
我找到了决定。我为生产者实现了计时器,并在应用程序运行时定期调用它。因此,当它第一次调用没有数据来处理它时,它什么也不发布,然后客户消费数据,第二次调用生产者将必要的数据发布到主题。
另一种解决方案是从消费者那里调用生产者方法。