首页 > 解决方案 > 关于 RxJava 和 PublishSubject 的初学者问题

问题描述

PublishSubject我对in有疑问RxJava。我创建了一个发出一些对象的虚拟 PublishSubject。这是我的代码:

override fun generate(exportRequest: ExportRequest): Observable<Report> {
        val faker = Faker()
        val dummyPublisher = PublishSubject.create<Report>()
        for(x in 1..1_000){
            val dataToExport = DataToExport(UUID.randomUUID(), faker.company().buzzword(), faker.company().name())
            val report = Report(dataToExport)
            sddPublisher.onNext(report)
            Thread.sleep(1)
        }
        dummyPublisher.onComplete()
        return dummyPublisher
    }

订阅时,不会发出任何对象。例如,没有打印任何内容:

... // somewhere in the code
reportStrategy.generate(exportRequest).subscribe { report: Report? ->
     println(report)
 }

也许我错过了一些东西。任何帮助将不胜感激

标签: javakotlinobservablereactive-programmingrx-java2

解决方案


正如@akarnokd 在评论中指出的那样,PublishSubject您创建的立即发出通过其onNext方法传递给它的任何值。无论当前是否订阅了任何内容,都会发生这种情况。它主要旨在帮助弥合命令式或基于回调的代码与反应式代码之间的差距。

您似乎想要的是Observable一旦订阅它就开始执行一些同步代码。Observable.create是创建此类实例的一种方法,但正确使用可能会很麻烦。

一种更方便的方法来创建您想要的内容是Observable.fromPublisher. 它需要 aPublisher作为参数。APublisher本身就是一个函数,只要订阅了created by并允许您将事件直接发送到 that ,它就会传递一个Subscriber实例。ObserverObservablefromPublisherObserver

您想要的代码如下所示:

fun generateReportStream(genFakeReport: () -> Report): Observable<Report> {
    return Observable.fromPublisher { subscriber ->
        for (x in 1..1_000) {
            val fakeReport = genFakeReport()
            subscriber.onNext(fakeReport)
            Thread.sleep(1)
        }
        subscriber.onComplete()
    }
}

fun main() {
    /** supply whatever logic you want to generate a fake [Report] */
    fun genFakeReport(): Report = TODO()
    val subscription = generateReportStream(::genFakeReport).subscribe(::println)
}

一旦订阅了Observable返回的实例,这将正确地发出值generateReportStream。此外,可以对同一个实例进行更多订阅,并且每个订阅都将使用相同的逻辑发出一个新的序列值。


推荐阅读