首页 > 解决方案 > 使用 Spring webflux 以服务器发送事件的方式从 Kafka 构建反应式 API 服务器消息

问题描述

我正在构建一个连接到 Kafka 主题的反应式 Web 服务,它将支持一个反应式 API,所有调用此 API 的请求者都将从 Kafka 接收新消息作为服务器发送的事件。(本来想API会支持关键字,只会接收包含这个关键字的kafka消息,结果发现Kafka不支持)。我对响应式和 spring webflux 很陌生,我目前的计划是订阅 Kafka,将消息从 Kafka 发送到接收器,然后控制器返回 sink.asFlux。但显然,我对水槽的理解是不正确的。任何建议都会非常有帮助:)

控制器

@RestController

@RequestMapping(path = "/sse")

public class SSEController {

    
    @Autowired
    private Flux<String> flux;

 
   
    @CrossOrigin(allowedHeaders = "*")
    
    @GetMapping(value = "/listen/{keyword}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    
    public Flux<String> listenToKeyword(@PathVariable String keyword) {

        // TODO: how to filter out by keyword as data stream

        return flux;

    }



    @CrossOrigin(allowedHeaders = "*")

    @GetMapping(value = "/listenall", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

    public Flux<String> listenToAll() {

        return flux;

    }
}

配置

@Configuration

public class KafkaConfiguration {

    Logger logger = LoggerFactory.getLogger(getClass());



    @Value("${spring.kafka.bootstrap-servers}")

    private String bootStrapServers;



    @Value("${spring.kafka.consumer.topic}")

    private String kafkaTopic;
    


    @Bean

    public Sinks.Many<String> sink() {

        return Sinks.many().replay().latest();

    }
  
  
    
    @Bean

    public Flux<String> flux(Sinks.Many<String> sink) {

        return sink.asFlux();

    }



    @Bean(name = "kafkaReceiver")

    public KafkaReceiver kafkaReceiver() {


        Map<String, Object> props = new HashMap<>();
        
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.put(

                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);


        try {

            ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(props).commitInterval(Duration.ZERO).commitBatchSize(0).subscription(Collections.singleton(kafkaTopic));

            KafkaReceiver<String, String> kafkaReceiver = KafkaReceiver.create(receiverOptions).receive().publishOn(Schedulers.newSingle("sample", true)).concatMap(m -> this.sink()::emitNext); // TODO: how to emit to the sink

            return kafkaReceiver;

        } catch (Exception e) {

            logger.debug("", e);

            throw e;

        }

    }

}

标签: springapache-kafkaspring-webfluxkafka-consumer-apireactive

解决方案


推荐阅读