java - RxJava3。为什么 FlowableSubscriber onNext 没有被调用?
问题描述
我需要获取一个 Subscription 对象才能有机会取消订阅听众。为此,我想给 FlowableSubscriber 一个功能。
代码:
FlowableSubscriber fs = new FlowableSubscriber() {
@Override
public void onSubscribe(@NonNull Subscription s) {
System.out.println("Flowable onSubs");
}
@Override
public void onNext(Object o) {
System.out.println("Flowable onNext");
}
@Override
public void onError(Throwable t) {
System.out.println("Flowable onErr");
}
@Override
public void onComplete() {
System.out.println("Flowable onComlet");
}
};
日志是:
Running...
Flowable onSubs
如果我使用 lambdas 它可以工作,但没有 onSubscribe 回调。
我如何获得订阅以及为什么没有调用这些方法?
解决方案
由于支持背压,您必须通过调用您的方法Flowable
来实际控制可以消耗的项目数量,以便它们可以通过以下方式发出:request
Subscription
Flowable
FlowableSubscriber fs = new FlowableSubscriber() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
System.out.println("Flowable onSubs");
subscription=s;
subscription.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println("Flowable onNext");
//subscription.request(1); you can also request more items from onNext method - it is up to you
}
@Override
public void onError(Throwable t) {
System.out.println("Flowable onErr");
}
@Override
public void onComplete() {
System.out.println("Flowable onComlet");
}
};
在示例Integer.MAX_VALUE
中要求订阅,但这可能不是最好的主意。问题是你应该调用Subscription::request
fromonSubscribe
来请求初始项目,然后调用它onNext
并决定你可以实际处理多少项目。
推荐阅读
- bash - 如何将文件名列表传递给命令
- c# - ZXing.Net.Mobile MobileBarcodeScanner 的 CustomOverlay
- r - R中数据框中的列对的马氏距离
- excel - 选择在 Col D 中具有特定数字的行
- python - 如果单元格值中不存在字符串,则添加到列表中,如果存在则中断并开始新列表?
- c# - 从 IIS 7.0 升级到 IIS 10
- uber-api - 获取 INVALID_REQUEST Uber API
- java - 如何捕捉弹簧集成套接字服务器中的错误?
- android - TimePickerDialogue Fragment kotlin 需要哪个 Fragment Manager
- airflow - 为什么即使分支任务失败并且触发规则成功,气流下游任务也会完成?