spring-boot - Springboot 云流与 Kafka
问题描述
我正在尝试使用带有 Kafka 的 Springboot cloud Stream 设置一个项目。我设法构建了一个简单的示例,其中侦听器从一个主题获取消息,并在处理后将输出发送到另一个主题。
我的监听器和频道配置如下:
@Component
public class FileEventListener {
private FileEventProcessorService fileEventProcessorService;
@Autowired
public FileEventListener(FileEventProcessorService fileEventProcessorService) {
this.fileEventProcessorService = fileEventProcessorService;
}
@StreamListener(target = FileEventStreams.INPUT)
public void handleLine(@Payload(required = false) String jsonData) {
this.fileEventProcessorService.process(jsonData);
}
}
public interface FileEventStreams {
String INPUT = "file_events";
String OUTPUT = "raw_lines";
@Input(INPUT)
SubscribableChannel inboundFileEventChannel();
@Output(OUTPUT)
MessageChannel outboundRawLinesChannel();
}
这个例子的问题是,当服务启动时,它不会检查主题中已经存在的消息,它只处理启动后发送的那些消息。我对 Springboot 流和 kafka 很陌生,但就我所读到的而言,这种行为可能与我使用SubscribableChannel
. 我尝试使用一个QueueChannel
例子来查看它是如何工作的,但我发现了以下异常:
Error creating bean with name ... nested exception is java.lang.IllegalStateException: No factory found for binding target type: org.springframework.integration.channel.QueueChannel among registered factories: channelFactory,messageSourceFactory
所以,我的问题是:
- 如果我想在应用程序启动后处理主题中存在的所有消息(并且消息仅由一个消费者处理),我走在正确的道路上吗?
- 即使
QueueChannel
不是实现 1 中解释的行为的正确选择。)我必须向我的项目添加什么才能使用这种类型的通道?
谢谢!
解决方案
添加
spring.cloud.stream.bindings.file_events.group=foo
- 默认情况下,匿名组仅从主题末尾开始使用,与组的绑定从开头开始使用。
您不能将 a
PollableChannel
用于绑定,它必须是 aSubscribableChannel
。
推荐阅读
- python - 如何提高涉及生成器和嵌套 for 循环的代码的效率
- r - For循环删除因子变量中的“,”以使其成为数字(R用户)
- python - PyQt5 作为 Mayavi 与 Python 3.6 的“合适的 UI 工具包”失败
- django - 检查模板中自定义模板标签的返回值
- smalltalk - GemStone Smalltalk 如何进行 ETL?
- javascript - 从用户输入生成循环和数组并打印循环输出
- vb.net - 变量“loginusername”在被赋值之前被使用。运行时可能会导致空引用异常
- python - 向上缩放轮廓/向外扩展
- pyspark - 在火花流/结构化流中从 Kafka 读取 avro 消息
- python - 如何每次使用 python 生成随机 json 数据?