rx-java - rxjava 过滤流中的项目与流中的任何其他过滤器不匹配
问题描述
我的应用程序中的 CommandManager 通过网络协议接收命令并将它们发布到命令 PublishSubject 上。特定的 CommandHandler 类订阅命令 PublishSubject,这样每个 CommandHandler 只接收它设计用来处理的命令。我还希望有一个默认命令处理程序,它只接收没有其他 CommandHandler 的过滤谓词匹配的命令。
// Each CommandHandler subscribes to a filtered
// stream of incoming commands matching this CommanHandler
commandManager.getCommandsPublishSubject()
.filter(getMessageFilterPredicate())
.subscribe(this::onNewMessage,
e -> LOGGER.error("Error getting new message", e));
有没有办法应用过滤器来匹配任何 CommandHandler 不匹配的项目?
解决方案
没有执行此操作的标准运算符。您必须创建一个自定义组件,而不是运算符,它为每个案例和默认主题托管过滤器 lambda 和更多主题。
public final class FilteringDispatcher<T> {
static final class FilterAndSubject<T> {
Predicate<T> predicate;
Subject<T> subject;
}
final ConcurrentMap<String, FilterAndSubject<T>> filters = new ConcurrentHashMap<>();
final Subject<T> defaultSubject = PublishSubject.<T>create().toSerialized();
public Observable<T> registerPredicate(String name, Predicate<T> predicate) {
Subject<T> subject = PublishSubject.<T>create().toSerialized();
FilterAndSubject fs = new FilterAndSubject();
fs.predicate = predicate;
fs.subject = subject;
FilterAndSubject old = filters.putIfAbsent(name, fs);
return old != null ? old.subject : subject;
}
public Observable<T> getObservable(String name) {
return filters.get(name).subject;
}
public Observable<T> getDefaultObservable() {
return defaultSubject;
}
public void onNext(T item) {
for (FilterAndSubject fs : filters.values()) {
if (fs.predicate.test(item)) {
fs.subject.onNext(item);
return;
}
}
defaultSubject.onNext(item);
}
}
推荐阅读
- java - 从具有最低键的优先级队列中删除节点
- php - PHP - JWT 获取令牌错误的段数
- json - 在我的 Tweepy 流 JSON 文件中找不到转推
- c# - 解密 AES 加密文件的选择性部分
- php - 如何对传入的 PHP 帖子进行排队
- sparql - sparql rdfs:range 不返回任何内容
- python - 在 Django views.py 中转换缩略图后如何将缩略图保存到 default_storage(AWS S3)?
- python - 如何让 n 个进程在 Python 2.7 中长度不同的作业列表上运行?
- java - 如何在Java中一次停止点击多个按钮(使用其他一些Jbutton)
- r - 如何使用 r 按列添加 excel 工作表