首页 > 解决方案 > 为什么在与 PriceResource Publisher 的多个连接中,只有一个获得流?

问题描述

似乎只有一个 http 客户端获取数据流,而其他客户端没有。

Publisher 是不是热数据,应该广播给所有订阅者,这是真的吗?

请在我可以允许多个 http 客户端使用 resteasy-rxjava2 / quarkus 使用 Flowable 数据流中找到更多信息吗?

package org.acme.kafka;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.reactivex.Flowable;
import io.reactivex.Observable;
import org.jboss.resteasy.annotations.SseElementType;
import org.reactivestreams.Publisher;

import io.smallrye.reactive.messaging.annotations.Channel;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static io.reactivex.Flowable.fromIterable;

/**
 * A simple resource retrieving the "in-memory" "my-data-stream" and sending the items to a server sent event.
 */
@Path("/migrations")
public class StreamingResource {
    private volatile Map<String, String> counterBySystemDate = new ConcurrentHashMap<>();

    @Inject
    @Channel("migrations")
    Flowable<String> counters;

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS) // denotes that server side events (SSE) will be produced
    @SseElementType("text/plain") // denotes that the contained data, within this SSE, is just regular text/plain data
    public Publisher<String> stream() {
        Flowable<String> mainStream = counters.doOnNext(dateSystemToCount -> {
            String key = dateSystemToCount.substring(0, dateSystemToCount.lastIndexOf("_"));
            counterBySystemDate.put(key, dateSystemToCount);
        });
        return fromIterable(counterBySystemDate.values().stream().sorted().collect(Collectors.toList()))
                .concatWith(mainStream)
                .onBackpressureLatest();
    }
}

标签: jax-rsrx-javarx-java2resteasyquarkus

解决方案


您可以使用Replay运算符或ConnectableObservable


推荐阅读