java - RxJava:PublishSubject 同步动作
问题描述
我需要一个允许将消息异步推送到我的功能,PublishSubject
并通过ConnectableObservable
. onNext
不幸的PublishSubject
是,在底层Subscriber
处理消息之前,似乎不会释放对的调用。
处理每条消息需要几秒钟的时间,在调试模式下,我看到它在调用将消息推送到 PublishSubject 的方法从堆栈中删除之前执行 -"After push..."
总是出现在控制台中的内部日志之后Subscriber
...
所以我有这个 RestEndpoint:
@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Message metadata = processor.apply(extId);
log.info("Before push...");
dataImporter.pushData(metadata);
log.info("After push...");
} catch (Exception e) {
e.printStackTrace();
}
}
});
return Response.ok("Request received successfully").build();
}
这是 DataImporter 的构造函数:
public DataImporter(final String configFile) {
dataToImportSubject = PublishSubject.create();
dataToImportObservable = dataToImportSubject.publish();
dataToImportObservable.connect();
dataToImportObservable
.onBackpressureBuffer(1, new Action0() {
@Override
public void call() {
logger.debug("Buffer full...");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Message>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable e) {
logger.error("Error importing "+e.getMessage());
}
@Override
public void onNext(Message value) {
request(1);
importResult(configFile, value);
}
@Override
public void onStart() {
request(1);
}
});
}
然后pushData
DataImporter 只是推送到PublishSubject
'sonNext
方法..:
public void pushData(Message metadata) {
dataToImportSubject.onNext(metadata);
}
PublishSubject
这是and的声明ConnectableObservable
:
public class DataImporter implements ImporterProxy{
private final PublishSubject<Message> dataToImportSubject;
private final ConnectableObservable<Message> dataToImportObservable;
解决方案
PublishSubject
onXXX
s 在原始调用的线程上向其消费者发出:
调度器:
PublishSubject
默认情况下不对特定对象进行操作,Scheduler
并且Observer
s 在调用了相应onXXX
方法的线程上得到通知。
您必须将处理移动到其他线程,observeOn
因为observeOn
可以将onXXX
调用移动到另一个线程。
subscribeOn
通常对 s 没有任何实际影响,Subject
因为它只影响订阅线程,并且不会调节onXXX
对这些主题的后续调用。
推荐阅读
- excel - 以不同方式计算度量
- java - 为什么按钮在点击时会调整大小
- java - 为什么在 Spring Boot 上使用 MappingJacksonValue
- python - Drag_and_drop 什么都不做
- reactjs - 如何使用带有 useEffect React 钩子的 async/await,我尝试了很多示例,但没有任何效果
- python - 如何解决此错误:TypeError: 'NoneType' object has no attribute '__getitem__'
- node.js - 在本地 docker localhost:8000 中使用 dynamodb 和在 localhost:4500 上运行的 serverless-framework serverless-offline 应用程序
- vue.js - Babelify 不转换 Vue 组件中的箭头功能
- javascript - 使两个数据数组之一不那么详细/减少总长度的方法?
- axios - Axios 帖子,缺少授权类型