首页 > 解决方案 > RxJava:PublishSubject 同步动作

问题描述

我需要一个允许将消息异步推送到我的功能,PublishSubject并通过ConnectableObservable. onNext不幸的PublishSubject是,在底层Subscriber处理消息之前,似乎不会释放对的调用。

处理每条消息需要几秒钟的时间,在调试模式下,我看到它在调用将消息推送到 PublishSubject 的方法从堆栈中删除之前执行 -"After push..."总是出现在控制台中的内部日志之后Subscriber...

所以我有这个 RestEndpoint:

@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            try {
                Message metadata = processor.apply(extId);
                log.info("Before push...");
                dataImporter.pushData(metadata);
                log.info("After push...");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });
    return Response.ok("Request received successfully").build();

}

这是 DataImporter 的构造函数:

public DataImporter(final String configFile) {
        dataToImportSubject = PublishSubject.create();
        dataToImportObservable = dataToImportSubject.publish();
        dataToImportObservable.connect();
        dataToImportObservable
            .onBackpressureBuffer(1, new Action0() {

                @Override
                public void call() {
                    logger.debug("Buffer full...");
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<Message>() {

                @Override
                public void onCompleted() {
                    // TODO Auto-generated method stub

                }

                @Override
                public void onError(Throwable e) {
                    logger.error("Error importing "+e.getMessage());
                }

                @Override
                public void onNext(Message value) {
                    request(1);
                    importResult(configFile, value);
                }

                @Override
                public void onStart() {
                    request(1);
                }
            });
    }

然后pushDataDataImporter 只是推送到PublishSubject'sonNext方法..:

public void pushData(Message metadata) {
    dataToImportSubject.onNext(metadata);       
}

PublishSubject这是and的声明ConnectableObservable

public class DataImporter implements ImporterProxy{

    private final PublishSubject<Message> dataToImportSubject;
    private final ConnectableObservable<Message> dataToImportObservable;

标签: javarx-javapublish-subscribe

解决方案


PublishSubjectonXXXs 在原始调用的线程上向其消费者发出:

JavaDocs

调度器

PublishSubject默认情况下不对特定对象进行操作,Scheduler并且Observers 在调用了相应onXXX方法的线程上得到通知。

您必须将处理移动到其他线程,observeOn因为observeOn可以将onXXX调用移动到另一个线程。

subscribeOn通常对 s 没有任何实际影响,Subject因为它只影响订阅线程,并且不会调节onXXX对这些主题的后续调用。


推荐阅读