java - Java - Spring Boot - 反应式 Redis 流 (TEXT_EVENT_STREAM_VALUE)
问题描述
我想编写一个始终显示 redis 流(反应式)的最新消息的端点。
实体看起来像这样{'key' : 'some_key', 'status' : 'some_string'}
。
所以我想得到以下结果:
- 页面被调用,内容例如显示一个实体:
{'key' : 'abc', 'status' : 'status_A'}
页面未关闭
- 然后将一个新实体添加到流中
XADD mystream * key abc status statusB
- 现在我希望看到 Stream 的每个项目,而不更新 Tab
{'key' : 'abc', 'status' : 'status_A'}
{'key' : 'abc', 'status' : 'status_B'}
当我尝试模拟这种行为时,它会起作用并且我得到了预期的输出。
@GetMapping(value="/light/live/mock", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Light> liveLightMock() {
List<Light> test = Arrays.asList(new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"));
return Flux.fromIterable(test).delayElements(Duration.ofMillis(500));
}
列表的各个元素一个接一个地显示,项目之间有 500 毫秒的延迟。
但是,当我尝试访问 Redis 而不是模拟变体时,它不再起作用。我尝试依次测试部分功能。因此,我的想法首先起作用,保存(1)功能必须起作用,如果保存功能起作用,则显示没有重新激活功能的旧记录必须起作用(2),最后但并非最不重要的一点是,如果两者都起作用,我需要让重新激活部分继续运行。
也许你们可以帮助我让反应部分工作。我工作了好几天,没有得到任何改进。
泰家伙:)
测试 1) - 保存功能(短版)
看起来像它的工作。
@GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
@Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());
return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}
测试 2) - 加载/读取功能(短版)
似乎正在工作,但不是 reaktiv -> 我在 WebView 打开时添加了一个新实体,视图显示了所有项目,但在我添加新项目后没有更新。重新加载后我看到了每一个项目
我怎样才能返回订阅流getLights
的正在使用的东西?TEXT_EVENT_STREAM_VALUE
@Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest
Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};
return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
@GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}
测试 1) - 保存功能(长版)
端点和保存功能是不同类的一部分。
String status = (++statusIdx % 2 == 0) ? "on" : "off";
触发器的状态从开到关,到开,到关,...
@GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
@Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());
return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}
验证函数 i
- Delted the Stream,清空它
127.0.0.1:6379> del mystream
(integer) 1
127.0.0.1:6379> XLEN myStream
(integer) 0
两次调用创建端点/light/create
我希望流现在有两个项目,状态=打开,一个关闭
127.0.0.1:6379> XLEN mystream
(integer) 2
127.0.0.1:6379> xread STREAMS mystream 0-0
1) 1) "mystream"
2) 1) 1) "1610456865517-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "off"
2) 1) "1610456866708-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "on"
看起来保存部分正在工作。
测试 2) - 加载/读取功能(长版)
似乎在工作,但不是 reaktiv -> 我添加了一个新实体,页面更新了它的值
@Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest
Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};
return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
@GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}
- 调用
/light/live
-> 我应该有N
条目 -> 如果我可以看到条目,则正常显示正在工作(非反应性) - 调用
/light/create
两次-> live Few 应该添加 2 Entries ->N+2
Entries - 等待 1 分钟只是为了安全
- 视图应该显示
N+2
Reactiv 部分的条目才能工作 - 从 1 ( ) 刷新视图,
/light/live
如果 Reactiv Works 仍应显示相同的数量
显示信息工作(1),(2)的添加部分工作,每个终端检查,4)没有工作
因此,显示器正在工作,但它没有反应
在我刷新浏览器 (5) 后,我得到了预期的N+2
条目 - 所以 (2) 也能正常工作
解决方案
这里有一个误解,从 Redis 响应式读取并不意味着您已经订阅了新事件。
Reactive 不会为您提供实时更新,它会调用 Redis 一次并显示那里的任何内容。因此,即使您等待一两天,UI/Console 中也没有任何变化,您仍然会看到 N 个条目。
您需要使用 Redis PUB/SUB 或需要重复调用 Redis 以获取最新更新。
编辑:
一个有效的解决方案..
private List<Light> reactiveReadToList() {
log.info("reactiveReadToList");
return read().collectList().block();
}
private Flux<Light> read() {
StreamOffset<Object> offset = StreamOffset.fromStart("mystream");
return redisTemplate
.opsForStream()
.read(offset)
.flatMap(
e -> {
Map<Object, Object> kvp = e.getValue();
String key = (String) kvp.get("key");
String id = (String) kvp.get("id");
String status = (String) kvp.get("status");
Light light = new Light(id, key, status);
log.info("{}", light);
return Flux.just(light);
});
}
使用响应式模板按需从 Redis 读取数据并使用偏移量将其发送到客户端的读取器,它一次只发送一个事件,我们可以发送所有事件。
@RequiredArgsConstructor
class DataReader {
@NonNull FluxSink<Light> sink;
private List<Light> readLights = null;
private int currentOffset = 0;
void register() {
readLights = reactiveReadToList();
sink.onRequest(
e -> {
long demand = sink.requestedFromDownstream();
for (int i = 0; i < demand && currentOffset < readLights.size(); i++, currentOffset++) {
sink.next(readLights.get(currentOffset));
}
if (currentOffset == readLights.size()) {
readLights = reactiveReadToList();
currentOffset = 0;
}
});
}
}
一种用于DataReader
产生通量的方法
public Flux<Light> getLights() {
return Flux.create(e -> new DataReader(e).register());
}
现在我们onRequest
在 sink 上添加了一个方法来处理客户端的需求,它根据需要从 Redis 流中读取数据并将其发送给客户端。
这看起来非常占用 CPU,如果没有更多新事件,我们应该延迟调用, register
如果我们看到流中没有新元素,可以在方法内部添加一个 sleep 调用。
推荐阅读
- mysql - 如何在 laravel 中使用聚合更新?
- c# - 如何在 WPF C# 中更改滑块刻度的宽度
- symfony4 - 如何在 Symfony4 中为所有异常添加用户到 Sentry?
- ios - 使用基于图像的照明在 3D 模型上应用 PBR 纹理
- actionscript-3 - 在 AS3 中移除对象/影片剪辑
- r - 子集空间点以提取多边形内部的空间点(国家边界)
- c# - 根据用户角色呈现 HTML 内容?
- c++ - 未声明 Visual c++ 标识符
- java - 创建一个文件夹并在安装 apk 时将 json 放入其中
- reactjs - 使用 hashes.map 函数选择标签