首页 > 解决方案 > 如何存储 Observer on_completed 结果

问题描述

我对 Rx 和 RxPy 比较陌生 - 我正在尝试做一件基本的事情,那就是访问我在 Observer 末尾存储的值on_completed。我有一种感觉,要么我遗漏了一些非常明显的东西,要么我可能将 Observable 概念弯曲成它不应该成为的东西。无论哪种方式,希望我能得到一些指导。

我已经查看了文档,materialize但它们似乎并不完全匹配。也研究过DoDisposable但找不到很多接近我需要的例子。

import rx
from rx import operators as ops
from rx.core import Observer

if __name__ == "__main__":
    class PipelineObserver(Observer):
        def __init__(self):
            self.status = None

        def on_next(self, payload):
            print(payload)

        def on_error(self, err):
            print(err)

        def on_completed(self):
            self.status = "Done"
            return self.status

     ## This returns a disposable, not the actual value I want. Which in this case is self.status
    output = rx.from_([1, 2]).subscribe(
        PipelineObserver()
    )

    print(output) ## Hoping for "Done" which is stored in self.status, not disposable class

无论如何可以从 on_completed 方法中访问一个值吗?没有将某些东西保存为全局变量(这对我来说似乎是个坏主意)我不确定这是否可能?基本上无论它输出什么on_completed或类似的东西。也许DoFinally

标签: pythonpython-3.xrx-py

解决方案


最后弄明白了,在这里发帖以防其他人遇到这个问题。由于这个操作,我正在寻找一个在可观察对象完成后必须运行的操作,它必须是一个阻塞操作。该run()功能用于此目的。

import rx
from rx import operators as ops
from rx.core import Observer

if __name__ == "__main__":
    class PipelineObserver(Observer):
        def __init__(self):
            self.status = None

        def on_next(self, payload):
            print(payload)

        def on_error(self, err):
            print(err)

        def on_completed(self):
            self.status = "Done"

    # First, seperate out the observer and observable:
    my_list = rx.from_([1, 2])

    my_list.subscribe(
        PipelineObserver()
    )

    # Say I want to return an integer of the count of items, I can use this:
    output = my_list.pipe(
       ops.count()
    ).run()

    print(output)
    # Notice the run command at the end of the chain.
    # Output: 2

可能有其他/更好的方法可以做到这一点,但这现在可行!


推荐阅读