java - 以只有一个订阅者会使用它的方式将 java 9 Flow 上的数据发布给订阅者
问题描述
有没有办法以只有一个订阅者接收数据的方式向订阅者发布数据?我想要实现的是订阅者发布者模型将作为一个队列工作,该队列有多个读者但一个发布者。一旦发布者发布数据,第一个接收它的订阅者将是唯一处理它的订阅者。
提前致谢 !!!
解决方案
在反应式流中(至少,在他们的java.util.concurrent.Flow
化身中),订阅者只是请求数据,只有发布者才能控制如何发布这些数据。
Flow.Publisher
Java 9 中唯一的通用实现是SubmissionPublisher
遵循标准的发布/订阅方式,将任何已发布项目发布给所有订阅者。我没有找到任何简单的方法来破解SubmissionPublisher
让它只发布给一个订阅者。
但是您可以尝试编写自己的Flow.Publisher
实现,如下所示:
class QueueLikePublisher<T> implements Publisher<T> {
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
private List<QueueLikeSubscription<? super T>> subscriptions = new CopyOnWriteArrayList<>();
public synchronized void subscribe(Subscriber<? super T> subscriber) {
// subscribing: adding a new subscription to the list
QueueLikeSubscription<? super T> subscription = new QueueLikeSubscription<>(subscriber, executor);
subscriptions.add(subscription);
subscriber.onSubscribe(subscription);
}
public void submit(T item) {
// we got some data: looking for non-completed and demanding
// subscription and give it the data item
for (QueueLikeSubscription<? super T> subscription : subscriptions) {
if (!subscription.completed && subscription.demand > 0) {
subscription.offer(item);
// we just give it to one subscriber; probaly offer() call needs
// to be wrapped in a try/catch
break;
}
}
}
static class QueueLikeSubscription<T> implements Subscription {
private final Subscriber<? super T> subscriber;
private final ExecutorService executor;
volatile int demand = 0;
volatile boolean completed = false;
QueueLikeSubscription(Subscriber<? super T> subscriber,
ExecutorService executor) {
this.subscriber = subscriber;
this.executor = executor;
}
public synchronized void request(long n) {
if (n != 0 && !completed) {
if (n < 0) {
IllegalArgumentException ex = new IllegalArgumentException();
executor.execute(() -> subscriber.onError(ex));
} else {
// just extending the demand
demand += n;
}
}
}
public synchronized void cancel() {
completed = true;
}
Future<?> offer(T item) {
return executor.submit(() -> {
try {
subscriber.onNext(item);
} catch (Exception e) {
subscriber.onError(e);
}
});
}
}
}
它将项目发布给尚未完成(例如,已取消)且需求非零的第一个订阅者。
请注意,此代码只是一个大纲,用于说明该想法。例如,它可能应该包含更多的异常处理(如处理RejectedExecutionException
)。
推荐阅读
- python - 如何对嵌套字典值进行数字排序?
- java - 在 centOS-ARM 中使用 Apache PDFBox 转换的 PDF 中的字形异常
- python - 初学者——为什么我不能转换成分钟?
- php - PHP Artisan 获取 ErrorException:数组偏移量为空
- java - 如何在wildfly中重定向应用程序路径?
- reactjs - Aborting OLD API Fetch request REACT
- .net - 忘记密码策略重定向
- javascript - 如何更改 NuxtJS 中样式的加载顺序?
- django - 自定义操作的 Django Rest Framework URL 不起作用
- csv - 如何以 CSV 格式导出分页表数据?