spring - 使用 Spring webflux 以服务器发送事件的方式从 Kafka 构建反应式 API 服务器消息
问题描述
我正在构建一个连接到 Kafka 主题的反应式 Web 服务,它将支持一个反应式 API,所有调用此 API 的请求者都将从 Kafka 接收新消息作为服务器发送的事件。(本来想API会支持关键字,只会接收包含这个关键字的kafka消息,结果发现Kafka不支持)。我对响应式和 spring webflux 很陌生,我目前的计划是订阅 Kafka,将消息从 Kafka 发送到接收器,然后控制器返回 sink.asFlux。但显然,我对水槽的理解是不正确的。任何建议都会非常有帮助:)
控制器
@RestController
@RequestMapping(path = "/sse")
public class SSEController {
@Autowired
private Flux<String> flux;
@CrossOrigin(allowedHeaders = "*")
@GetMapping(value = "/listen/{keyword}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> listenToKeyword(@PathVariable String keyword) {
// TODO: how to filter out by keyword as data stream
return flux;
}
@CrossOrigin(allowedHeaders = "*")
@GetMapping(value = "/listenall", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> listenToAll() {
return flux;
}
}
配置
@Configuration
public class KafkaConfiguration {
Logger logger = LoggerFactory.getLogger(getClass());
@Value("${spring.kafka.bootstrap-servers}")
private String bootStrapServers;
@Value("${spring.kafka.consumer.topic}")
private String kafkaTopic;
@Bean
public Sinks.Many<String> sink() {
return Sinks.many().replay().latest();
}
@Bean
public Flux<String> flux(Sinks.Many<String> sink) {
return sink.asFlux();
}
@Bean(name = "kafkaReceiver")
public KafkaReceiver kafkaReceiver() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
try {
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(props).commitInterval(Duration.ZERO).commitBatchSize(0).subscription(Collections.singleton(kafkaTopic));
KafkaReceiver<String, String> kafkaReceiver = KafkaReceiver.create(receiverOptions).receive().publishOn(Schedulers.newSingle("sample", true)).concatMap(m -> this.sink()::emitNext); // TODO: how to emit to the sink
return kafkaReceiver;
} catch (Exception e) {
logger.debug("", e);
throw e;
}
}
}
解决方案
推荐阅读
- django - 包含图像、文件和标签的模型的夹具数据
- python - 我有这个错误: json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
- c - 如何获得在 GNU-EFI 下工作的协议?
- php - 谷歌分析 v4 和谷歌 API
- php - laravel 7 …/vendor/composer/autoload_real.php 中没有这样的文件或目录
- r - R:自定义函数 - 改变现有列
- sql - 转置postgresql中的列 - 横向连接?
- javascript - 重新加载页面后在 setInterval() 上节省时间?
- java - 使用反射时如何返回特定的构造函数?
- sql-server - 全文搜索不起作用 - SQL Server