首页 > 解决方案 > 如何使用 RxJava2 创建谓词

问题描述

我正在尝试学习 Rxjava,但仍然处于非常基础的水平。我正在寻找有关以下问题的一些指导。

我有这两个 Observables,

    fun getPlayheadPositionInMilliseconds(): Observable<Long> {
        return ConnectableObservable.interval(1000, TimeUnit.MILLISECONDS)
            .map { exoPlayer.currentPosition }
            .publish()
            .autoConnect()
    }

    fun getContentDurationInMilliseconds(): Observable<Long> {
        return ConnectableObservable.just(exoPlayer.duration)
            .publish()
            .autoConnect()
    }

现在我正在尝试从这些中创建一个谓词。我的要求是当getPlayheadPositionInMilliseconds达到 70% 时 getContentDurationInMilliseconds我想发出一个布尔信号。

如您所见,getPlayheadPositionInMilliseconds每 1000 毫秒后,订阅者将获得一个新值,现在我想将此值与我从中获得的总持续时间进行比较getContentDurationInMilliseconds。当getPlayheadPositionInMilliseconds值达到 70% 时getContentDurationInMilliseconds,将产生一个布尔信号。

我知道如何在不使用 RxJava 的情况下做到这一点,而是在寻找一种在 RxJava 中做到这一点的方法。如果需要更多信息,请告诉我。

标签: rx-javarx-java2exoplayer2.x

解决方案


如果我理解正确,您希望Observable<Boolean>每次发出一个子可观察对象时都会发出一个,并且根据您的谓词使布尔值为真。这可以通过以下方式实现:

    // Emits items when either child observable emits
    fun isPlayheadPosition70PercentOfContentDuration(): Observable<Boolean> =
        Observables                                         // Helper class from RxKotlin package
            .combineLatest(                                 // Emit a Pair<Long, Long> of values from the
                getPlayheadPositionInMilliseconds(),        // latest of the child observables
                getContentDurationInMilliseconds()
            )
            .map { (playheadPosition, contentDuration) ->   // Transform the item based on this function
                playheadPosition >= 0.7 * contentDuration   // (in this case, into a Boolean based on a predicate)
            }

哪里有一个带有来自RxKotlinObservables的有用方法的帮助类,在 Kotlin 中编写 Rx 时,这是一个不错的选择。

我想也许你也可能只想在你的谓词得到满足后立即触发一个事件。在这种情况下,您可以将上述内容转换为Completable当您的谓词首次变为真时触发的 a:

    // If you subscribe to this, the onComplete will signal once the predicate is satisfied
    fun playheadPositionHasReached70PercentContentDuration(): Completable =
        Observables
            .combineLatest(getPlayheadPositionInMilliseconds(), getContentDurationInMilliseconds())

            // Filter: Only allow elements through the stream if they satisfy this predicate
            .filter { (playheadPosition, contentDuration) -> playheadPosition >= 0.7 * contentDuration }

            // Take only the first element (i.e. turn this into a Single)
            .firstOrError()

            // Actually, I don't care about the element, I just want to know when it happens
            // (i.e. turn this into a Completable)
            .ignoreElement()

推荐阅读