spring-boot - Kinesis 作为 Spring Boot Reactive Stream API 中的生产者
问题描述
我正在尝试构建一个小型 Spring Boot Reactive API。API 应该让用户订阅一些数据,作为 SSE 返回。
数据位于 Kinesis 主题上。
创建反应式 API 和 Kinesis 的 StreamListener 相当容易 - 但我可以将它们结合起来,因此 Kinesis 主题用作我的数据服务使用的事件流的生产者。
代码看起来或多或少像这样
//Kinesis binding, with listenerMode: rawRecords
@EnableBinding(Sink.class)
public class KinesisStreamListener {
@StreamListener(value = Sink.INPUT)
public void logger(List<Record> payload) throws Exception {
}
}
@RestController
@RequestMapping("/data")
public class DataResource {
@Autowired
DataService service;
@GetMapping(produces = {MediaType.TEXT_EVENT_STREAM_VALUE, MediaType.APPLICATION_STREAM_JSON_VALUE})
public Flux<EventObject> getData() {
return service.getData();
}
}
@Component
public class DataService {
Flux<EventObject> getData() {
Flux<Long> interval = Flux.interval(Duration.ofMillis(1000));
Flux<EventObject> dataFlux = Flux.fromStream(Stream.generate(() -> ???
));
return dataFlux.zip(interval, dataFlux).map(Tuple2::getT2);
}
}
解决方案
这是我将如何做到这一点的示例:https ://github.com/artembilan/sandbox/tree/master/cloud-stream-kinesis-to-webflux 。
一旦我们就细节和一些改进达成一致,它可以转到官方的 Spring Cloud Stream Samples 存储库:https ://github.com/spring-cloud/spring-cloud-stream-samples
主要思想是重用通过 Spring Cloud Stream Reactive SupportFlux
提供的相同内容。@StreamListener
这已经是一个FluxPublish
,因此任何新的 SSE 连接都将作为普通的 Reactive 订阅者工作。
有几个技巧可以计算:
- 对于
listenerMode: rawRecords
,我们还需要配置 a来避免 Binder 向通道contentType: application/octet-stream
发送消息时的任何转换尝试。Sink.INPUT
- 由于在方法中
listenerMode: rawRecords
返回一个List<Record>
our应该完全期望这种类型,而不是一个普通的.Flux
@StreamListener
Record
这两个问题都被视为框架改进。
所以,现在让我们看看它的外观和工作原理。
推荐阅读
- javascript - 将带有 jquery 的 html 模板转换为 React js
- eslint - 苗条与漂亮/eslint
- javascript - NodeJS require() 无法访问模块并且不返回任何函数
- vba - VBA 舍入函数,存储其值以供以后使用,收到“溢出”错误消息
- python - 从条目中获取值以在框架内
- python-3.x - 如何在 sklearn 中执行 train_test_split 但根据列的某个成员限制/指定输出?关闭
- javascript - vue + 复选框 + 更改事件 - 页面和控制台上的不同结果
- jenkins - 我们如何在两个 Jenkins 大师之间共享凭证
- python - 由于项目主 urls.py 中的配置错误,Django 无法正确处理获取请求
- discord.py - 如何实现暂停命令(lavalink)?