首页 > 解决方案 > 以只有一个订阅者会使用它的方式将 java 9 Flow 上的数据发布给订阅者

问题描述

有没有办法以只有一个订阅者接收数据的方式向订阅者发布数据?我想要实现的是订阅者发布者模型将作为一个队列工作,该队列有多个读者但一个发布者。一旦发布者发布数据,第一个接收它的订阅者将是唯一处理它的订阅者。

提前致谢 !!!

标签: javajava-9flow

解决方案


在反应式流中(至少,在他们的java.util.concurrent.Flow化身中),订阅者只是请求数据,只有发布者才能控制如何发布这些数据。

Flow.PublisherJava 9 中唯一的通用实现是SubmissionPublisher遵循标准的发布/订阅方式,将任何已发布项目发布给所有订阅者。我没有找到任何简单的方法来破解SubmissionPublisher让它只发布给一个订阅者。

但是您可以尝试编写自己的Flow.Publisher实现,如下所示:

class QueueLikePublisher<T> implements Publisher<T> {
    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
    private List<QueueLikeSubscription<? super T>> subscriptions = new CopyOnWriteArrayList<>();

    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        // subscribing: adding a new subscription to the list
        QueueLikeSubscription<? super T> subscription = new QueueLikeSubscription<>(subscriber, executor);
        subscriptions.add(subscription);
        subscriber.onSubscribe(subscription);
    }

    public void submit(T item) {
        // we got some data: looking for non-completed and demanding
        // subscription and give it the data item

        for (QueueLikeSubscription<? super T> subscription : subscriptions) {
            if (!subscription.completed && subscription.demand > 0) {
                subscription.offer(item);
                // we just give it to one subscriber; probaly offer() call needs
                // to be wrapped in a try/catch
                break;
            }
        }
    }

    static class QueueLikeSubscription<T> implements Subscription {
        private final Subscriber<? super T> subscriber;
        private final ExecutorService executor;
        volatile int demand = 0;
        volatile boolean completed = false;

        QueueLikeSubscription(Subscriber<? super T> subscriber,
                ExecutorService executor) {
            this.subscriber = subscriber;
            this.executor = executor;
        }

        public synchronized void request(long n) {
            if (n != 0 && !completed) {
                if (n < 0) {
                    IllegalArgumentException ex = new IllegalArgumentException();
                    executor.execute(() -> subscriber.onError(ex));
                } else {
                    // just extending the demand
                    demand += n;
                }
            }
        }

        public synchronized void cancel() {
            completed = true;
        }

        Future<?> offer(T item) {
            return executor.submit(() -> {
                try {
                    subscriber.onNext(item);
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            });
        }
    }
}

它将项目发布给尚未完成(例如,已取消)且需求非零的第一个订阅者。

请注意,此代码只是一个大纲,用于说明该想法。例如,它可能应该包含更多的异常处理(如处理RejectedExecutionException)。


推荐阅读