java - 获取元素直到某个字符并使用 RxJava 将它们分组
问题描述
我有一个简单的问题设置,但解决方案似乎更复杂。
设置:我有一个热可观察对象,它源自扫描仪,它将每个数字作为不同的元素以及R
代码完成时发出。
问题:从这里我想要一个热的可观察对象,它将每个完整代码作为 1 个元素发出。
我尝试使用不同flatMap
的 ,takeUntil
和groupBy
操作符,但未能找到解决方案。
解决方案
您可以使用缓冲区运算符。
PublishSubject<Token<Integer>> s = PublishSubject.create();
Observable<Token<Integer>> markers = s.filter(x->x.isMarker());
s.buffer(markers).subscribe(
v->{
Optional<Integer> reduce = v.stream()
.filter(t->!t.isMarker())
.map(t->(ValueToken<Integer>)t)
.map(ValueToken::get)
.reduce((a,b)->a+b);
reduce.ifPresent(System.out::println);
}
);
s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker()); // will emit 25
s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker()); // will emit 17
s.onNext(value(10));
s.onNext(value(7)); // Not emitting yet
我创建了一个类来包装流中的值和标记。
public abstract class Token<T> {
private static final MarkerToken MARKER = new MarkerToken<>();
public boolean isMarker() {
return false;
}
public static <T> MarkerToken<T> marker() {
return MARKER;
}
public static <T> ValueToken<T> value(T o) {
return new ValueToken<>(o);
}
public static class ValueToken<T> extends Token<T> {
T value;
public ValueToken(T value) {
this.value = value;
}
public T get() {
return value;
}
}
public static class MarkerToken<T> extends Token<T> {
public boolean isMarker() {
return true;
}
}
}
更新(使用扫描)
先前的方法也会在流关闭时发出,使用此解决方案您只能发出完整的缓冲区。
消息类用作累加器,它将累积令牌,直到累积结束标记。
发生这种情况时,下一条消息将从头开始。
结束标记作为最后一个元素的存在将消息标记为完整。
public static class Message<T> {
List<Token<T>> tokens = new ArrayList<>();
public Message<T> append(Token<T> t) {
Message<T> mx = new Message<T>();
if(!isComplete()) {
mx.tokens.addAll(tokens);
}
mx.tokens.add(t);
return mx;
}
public boolean isComplete() {
int n = tokens.size();
return n>0 && tokens.get(n-1).isMarker();
}
public Optional<List<Token<T>>> fullMessage(){
return isComplete() ? Optional.of(tokens):Optional.empty();
}
}
扫描您为每个发出的令牌发出消息的源,然后过滤掉不完整的消息并只发出标记为完成的消息。
s.scan(new Message<Integer>(), (a, b) -> a.append(b))
.filter(Message::isComplete)
.map(Message::fullMessage)
.map(Optional::get).subscribe(v -> {
System.out.println(v);
});
s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker());// [V(12), V(13), MARKER]
s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker()); // [V(10), V(7), MARKER]
s.onNext(value(10));
s.onNext(value(127));
s.onComplete(); // Not emitting incomplete messages on the closing of the subject.
推荐阅读
- bash - 为什么创建新的“屏幕”会话时我的 .bash_profile 会中断?
- python - 推荐在 Python 中与 SSH 会话交互的方法
- google-apps-script - 从 Gmail 到 Google 表格的 XLSX:无效的 mime 类型。内容类型似乎是应用程序/八位字节?
- python - 是否有 numpy / ndarrays 的包装器返回每个突变方法的数组?
- regex - 查找和替换密钥和文件名
- c++ - 将 const 字符串传递给函数 c++ 时出现意外行为
- php - 将非标准 API 响应解析为数组
- python - 覆盖 2 个具有相同索引的 DataSeries 图
- java - 为什么我们不能从静态方法中调用非静态方法?
- django - Django:可重用应用程序测试的设置?