首页 > 解决方案 > 如何使用 spring-cloud-stream 实现 Kafka 消费者以按需处理事件?

问题描述

我正在尝试实现一个 PollableConsumer,它在特定条件下开始轮询来自 Kafka 的消息,在这种情况下,当我在我的 SpringBoot 应用程序中点击一个端点时。

我尝试了多种在特定条件下触发轮询器的方法,但显然它只有在不断地从 kafka 主题轮询时才有效。(就像 spring-cloud-stream 文档中的所有示例一样)

我正在寻找这样的东西:

public interface CustomProcessor {
    @Input
    PollableMessageSource input();
}
 public void run() {
        boolean result = true;
        while (result) {
            result = input.poll(m -> {
                Event event = (Event) m.getPayload();
                GenericMessage<Event> genericMessage = new GenericMessage<>(event, m.getHeaders());
                eventMessageConsumer.consume(genericMessage);
            }, new ParameterizedTypeReference<Event>() {
            });

            try {
                Thread.sleep(1_000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            if (result) {
                System.out.println("Success");
            }
        }
    }

当我遇到这样的端点时,可能会触发它:

@GetMapping("/process")
public void process() {
   SomeClass.run();
}

标签: spring-cloud-stream

解决方案


显然目前无法使用 spring-cloud-stream 暂停 PollableConsumer,所以我回到基于事件的消息消费并使用执行器来控制绑定的状态。在这个线程和 spring-cloud-stream文档之后,我注入了 BindingsEndpoint 并改变了绑定的状态,如下所示:

@RestController
public class EventController {
    @Autowired
    public EventController(BindingsEndpoint bindingsEndpoint) {
        this.bindingsEndpoint = bindingsEndpoint;
    }
    @GetMapping("/changeState")
    public void sendMessage(@RequestParam("state") String state) {
        if (state.equals("paused")) {
            bindingsEndpoint.changeState("MY_BINDING", 
                           BindingsEndpoint.State.PAUSED);
        }
        if (state.equals("resumed")) {
            bindingsEndpoint.changeState("MY_BINDING", 
                           BindingsEndpoint.State.RESUMED);
        }
    }

这不完全是我想要实现的,但它已经足够接近了。


推荐阅读