首页 > 解决方案 > 在 Spring Boot 中暂停/启动 Kafka 流处理器

问题描述

我将为消息实现断路器模式。基本要求是,如果微服务无法将消息发布到发布主题,它应该停止接受来自其他 Kafka 主题的消息。当发布主题可用时,微服务应该开始接受来自其他 Kafka 主题的消息。

有没有办法在 Spring boot Kafka Streams 中实现这一点?

标签: javaspring-bootapache-kafkaspring-kafkacircuit-breaker

解决方案


我能够通过使用BindingsEndpoint.

private final BindingsEndpoint binding;

@Override
public void stop() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Stopping Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::stop);
        log.info("Stopped Kafka topics ");
    }
}

@Override
public void start() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Starting Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::start);
        log.info("Started Kafka topics ");
    }
}

protected List<Binding> getBindings(List<?> objects) {
    return objects.stream().filter(object -> object instanceof Binding)
            .map(object -> (Binding) object).collect(Collectors.toList());
}

推荐阅读