spring - 无法从卡夫卡读取
问题描述
我是卡夫卡的初学者。我正在使用 docker 映像 Landoop ...我使用了控制台生产者和控制台消费者,并且我可以在主题中读/写
现在我正在尝试对 spring 做同样的事情。我能够创建一个新主题,发送消息,但我无法阅读它们。这些是我的 java 类:
这是配置类:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
这是消费者
public class ConsumerService {
@KafkaListener(topics = "user.subscription", group = "group_id")
public void consume(String message) {
System.out.println("Consumed message: " + message);
}
}
非常感谢您提前
解决方案
推荐阅读
- keycloak - Angular项目使用keycloak管理访问权限
- html - 需要一个在某个元素第一次出现之前找到特定类型的所有元素的 xpath
- php - 我想在一段中找到一个单词的第一个字母的strpos
- python - 发送以当前日期为主题的邮件
- android - 在某些设备中应用“fitsSystemWindows”后创建了一个空间。我怎样才能删除它?
- python - 模型预测 JSON 请求格式在 python 请求中不起作用:Google AI 平台
- python - 运行 Python + Behave 自动化项目并尝试在其他步骤中执行步骤
- google-cloud-platform - 原因:com.google.api.client.googleapis.json.GoogleJsonResponseException: 412 Precondition Failed while remove bucket IAM Member
- ios - 如何使用 get 方法显示 Json 响应
- javascript - 如何限制用户在日期字段中手动输入日期