spring-cloud-stream - 如何使用 spring-cloud-stream 实现 Kafka 消费者以按需处理事件?
问题描述
我正在尝试实现一个 PollableConsumer,它在特定条件下开始轮询来自 Kafka 的消息,在这种情况下,当我在我的 SpringBoot 应用程序中点击一个端点时。
我尝试了多种在特定条件下触发轮询器的方法,但显然它只有在不断地从 kafka 主题轮询时才有效。(就像 spring-cloud-stream 文档中的所有示例一样)
我正在寻找这样的东西:
public interface CustomProcessor {
@Input
PollableMessageSource input();
}
public void run() {
boolean result = true;
while (result) {
result = input.poll(m -> {
Event event = (Event) m.getPayload();
GenericMessage<Event> genericMessage = new GenericMessage<>(event, m.getHeaders());
eventMessageConsumer.consume(genericMessage);
}, new ParameterizedTypeReference<Event>() {
});
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (result) {
System.out.println("Success");
}
}
}
当我遇到这样的端点时,可能会触发它:
@GetMapping("/process")
public void process() {
SomeClass.run();
}
解决方案
显然目前无法使用 spring-cloud-stream 暂停 PollableConsumer,所以我回到基于事件的消息消费并使用执行器来控制绑定的状态。在这个线程和 spring-cloud-stream文档之后,我注入了 BindingsEndpoint 并改变了绑定的状态,如下所示:
@RestController
public class EventController {
@Autowired
public EventController(BindingsEndpoint bindingsEndpoint) {
this.bindingsEndpoint = bindingsEndpoint;
}
@GetMapping("/changeState")
public void sendMessage(@RequestParam("state") String state) {
if (state.equals("paused")) {
bindingsEndpoint.changeState("MY_BINDING",
BindingsEndpoint.State.PAUSED);
}
if (state.equals("resumed")) {
bindingsEndpoint.changeState("MY_BINDING",
BindingsEndpoint.State.RESUMED);
}
}
这不完全是我想要实现的,但它已经足够接近了。
推荐阅读
- apache-spark - 从另一个数据源读取路径时,如何避免 collectAsList() ?
- r - R子集包含多个关键字的ID
- c# - 正则表达式条件字符串仅包含英语而不是希腊语,反之亦然(希腊语而不是英语)
- angular - 角打字稿 $event.target.value
- python - How to Append a List Key Value using Another List
- java - 如何从数据库中保存的有效负载重新创建 web 服务对象?
- c - 在 C 中创建外壳 - 管道和外部命令
- pytest - 我想知道代码本身中每个测试用例的运行时间,在数据砖中使用 pytest
- typescript - 在 TypeScript 的类方法中使用类型谓词
- c# - 如何将我的工作 SQL 查询转换为 Linq (C#)?