jax-rs - 为什么在与 PriceResource Publisher 的多个连接中,只有一个获得流?
问题描述
似乎只有一个 http 客户端获取数据流,而其他客户端没有。
Publisher 是不是热数据,应该广播给所有订阅者,这是真的吗?
请在我可以允许多个 http 客户端使用 resteasy-rxjava2 / quarkus 使用 Flowable 数据流中找到更多信息吗?
package org.acme.kafka;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import org.jboss.resteasy.annotations.SseElementType;
import org.reactivestreams.Publisher;
import io.smallrye.reactive.messaging.annotations.Channel;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.reactivex.Flowable.fromIterable;
/**
* A simple resource retrieving the "in-memory" "my-data-stream" and sending the items to a server sent event.
*/
@Path("/migrations")
public class StreamingResource {
private volatile Map<String, String> counterBySystemDate = new ConcurrentHashMap<>();
@Inject
@Channel("migrations")
Flowable<String> counters;
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS) // denotes that server side events (SSE) will be produced
@SseElementType("text/plain") // denotes that the contained data, within this SSE, is just regular text/plain data
public Publisher<String> stream() {
Flowable<String> mainStream = counters.doOnNext(dateSystemToCount -> {
String key = dateSystemToCount.substring(0, dateSystemToCount.lastIndexOf("_"));
counterBySystemDate.put(key, dateSystemToCount);
});
return fromIterable(counterBySystemDate.values().stream().sorted().collect(Collectors.toList()))
.concatWith(mainStream)
.onBackpressureLatest();
}
}
解决方案
您可以使用Replay运算符或ConnectableObservable
推荐阅读
- reactjs - 用纹理反应三纤维 Drei 广告牌
- sharepoint - Sharepoint(图形)API 访问 OneDrive 文件列表 - 403 / 404 错误
- ios - 应用从后台状态返回后 locationManager.stopUpdateLocations 不起作用
- python - Plotly:如何使用 Plotly Express 组合散点图和线图?
- javascript - IE11或IE9无法运行以下js代码
- reactjs - 如何反应在箭头函数中使用这个关键字
- typescript - TypeScript 中的函数名称类型
- assembly - 如何将Assembly(irvine masm)中的字符串重新初始化为null?
- mongodb - 为什么我不能在不同的 mongodb 集合中有类似的索引(索引)?
- visual-studio - 附加 Visual Studio 调试器时,Windows 窗体应用程序响应非常缓慢