java - 关于 RxJava 和 PublishSubject 的初学者问题
问题描述
PublishSubject
我对in有疑问RxJava
。我创建了一个发出一些对象的虚拟 PublishSubject。这是我的代码:
override fun generate(exportRequest: ExportRequest): Observable<Report> {
val faker = Faker()
val dummyPublisher = PublishSubject.create<Report>()
for(x in 1..1_000){
val dataToExport = DataToExport(UUID.randomUUID(), faker.company().buzzword(), faker.company().name())
val report = Report(dataToExport)
sddPublisher.onNext(report)
Thread.sleep(1)
}
dummyPublisher.onComplete()
return dummyPublisher
}
订阅时,不会发出任何对象。例如,没有打印任何内容:
... // somewhere in the code
reportStrategy.generate(exportRequest).subscribe { report: Report? ->
println(report)
}
也许我错过了一些东西。任何帮助将不胜感激
解决方案
正如@akarnokd 在评论中指出的那样,PublishSubject
您创建的立即发出通过其onNext
方法传递给它的任何值。无论当前是否订阅了任何内容,都会发生这种情况。它主要旨在帮助弥合命令式或基于回调的代码与反应式代码之间的差距。
您似乎想要的是Observable
一旦订阅它就开始执行一些同步代码。Observable.create
是创建此类实例的一种方法,但正确使用可能会很麻烦。
一种更方便的方法来创建您想要的内容是Observable.fromPublisher
. 它需要 aPublisher
作为参数。APublisher
本身就是一个函数,只要订阅了created by并允许您将事件直接发送到 that ,它就会传递一个Subscriber
实例。Observer
Observable
fromPublisher
Observer
您想要的代码如下所示:
fun generateReportStream(genFakeReport: () -> Report): Observable<Report> {
return Observable.fromPublisher { subscriber ->
for (x in 1..1_000) {
val fakeReport = genFakeReport()
subscriber.onNext(fakeReport)
Thread.sleep(1)
}
subscriber.onComplete()
}
}
fun main() {
/** supply whatever logic you want to generate a fake [Report] */
fun genFakeReport(): Report = TODO()
val subscription = generateReportStream(::genFakeReport).subscribe(::println)
}
一旦订阅了Observable
返回的实例,这将正确地发出值generateReportStream
。此外,可以对同一个实例进行更多订阅,并且每个订阅都将使用相同的逻辑发出一个新的序列值。
推荐阅读
- python - python中的IndentationError - 我的代码有什么问题?
- android - 当我尝试从回收站视图调用 user1 时,此代码向我显示错误
- angular - 如何将模板传递给Angular中的嵌套组件
- tensorflow - TensorFlow XLA 是否已弃用?
- javascript - 使用带有 javascript 的两个队列构建堆栈
- r - 如何根据 Shiny 中其他输入的值对输入应用限制和有限的可访问性?
- transactions - 春季批次的@TransactionalEventListener
- redis - 来自 RDB 基础文件的 Redis AOF
- php - 如何使用 Eloquent DB::connection 进行原始 SQL 查询?
- java - 将我的 Java Swing 程序转换为 MVC 程序?