首页 > 解决方案 > Kinesis 作为 Spring Boot Reactive Stream API 中的生产者

问题描述

我正在尝试构建一个小型 Spring Boot Reactive API。API 应该让用户订阅一些数据,作为 SSE 返回。

数据位于 Kinesis 主题上。

创建反应式 API 和 Kinesis 的 StreamListener 相当容易 - 但我可以将它们结合起来,因此 Kinesis 主题用作我的数据服务使用的事件流的生产者。

代码看起来或多或少像这样

//Kinesis binding, with listenerMode: rawRecords
@EnableBinding(Sink.class)
public class KinesisStreamListener {

  @StreamListener(value = Sink.INPUT)
  public void logger(List<Record> payload) throws Exception {

  }
}

@RestController
@RequestMapping("/data")
public class DataResource {

  @Autowired
  DataService service;

  @GetMapping(produces = {MediaType.TEXT_EVENT_STREAM_VALUE, MediaType.APPLICATION_STREAM_JSON_VALUE})
  public Flux<EventObject> getData() {
    return service.getData();
  }
}

@Component
public class DataService {

  Flux<EventObject> getData() {
    Flux<Long> interval = Flux.interval(Duration.ofMillis(1000));
    Flux<EventObject> dataFlux = Flux.fromStream(Stream.generate(() -> ???
            ));
      return dataFlux.zip(interval, dataFlux).map(Tuple2::getT2);
  }
}

标签: spring-bootreactive-programmingamazon-kinesisspring-cloud-streamspring-webflux

解决方案


这是我将如何做到这一点的示例:https ://github.com/artembilan/sandbox/tree/master/cloud-stream-kinesis-to-webflux 。

一旦我们就细节和一些改进达成一致,它可以转到官方的 Spring Cloud Stream Samples 存储库:https ://github.com/spring-cloud/spring-cloud-stream-samples

主要思想是重用通过 Spring Cloud Stream Reactive SupportFlux提供的相同内容。@StreamListener这已经是一个FluxPublish,因此任何新的 SSE 连接都将作为普通的 Reactive 订阅者工作。

有几个技巧可以计算:

  1. 对于listenerMode: rawRecords,我们还需要配置 a来避免 Binder 向通道contentType: application/octet-stream发送消息时的任何转换尝试。Sink.INPUT
  2. 由于在方法中listenerMode: rawRecords返回一个List<Record>our应该完全期望这种类型,而不是一个普通的.Flux@StreamListenerRecord

这两个问题都被视为框架改进。

所以,现在让我们看看它的外观和工作原理。


推荐阅读