首页 > 解决方案 > 如何在单独的 Python 多处理进程中从数据流中获取最新值?

问题描述

我有一个从传感器读取的过程和另一个从图表中读取的过程。但是,绘图仪可能比传感器读数慢,尤其是在添加了许多传感器时。

我看到将信息从传感器传递给绘图者的两个选项:管道和 mp.Value。我所知道的管道应该更快,但我担心绘图器开始延迟的问题:如果传感器采样的速度是绘图器的 n 倍,那么对于每个绘图器时间步,我们只会在 1/n 时间步长未来(例如,如果两倍快,20 秒后,绘图员只显示 10 秒)。我可以看到传感器在添加新值之前轮询管道并删除所有值,但这听起来很昂贵。mp.Value 路由确实需要更显式的锁定,我相信它不如 Pipe 类快,尽管我不确定。

处理这种多处理以避免这里出现问题的最佳方法是什么?

编辑澄清:我不在乎绘图员是否获得所有信息。使用最近的值很好,这就是为什么标题说“Pipe only Last Value”。绘图员的主要要求就是不要延迟绘图,即使我们通过丢弃数据有效地进行了下采样。传感器确实需要比绘图员读取的速度更快,因为数据也在被记录和处理,我们不想对这些信息进行下采样。

标签: pythonembeddedpython-multiprocessing

解决方案


要获取最新的传感器值,您实际上需要传感器进程等到绘图员准备好发送数据。有几种方法可以做到这一点,但我认为实际上使用两个单向 ( duplex=False) 管道是最好的方法,因为您不需要涉及任何额外的线程或信号量。在此设置中,第一个管道正常发送数据 sensor->grapher,但第二个管道只是表示grapher 已准备好立即接受数据的信号。用散文表达有点尴尬,所以这里是伪代码:

def grapher():
    while True:
        data = pipe_to_grapher.recv()
        graph(data)
        pipe_to_sensor.send(None)  # Can be any value

def sensor():
    while True:
        data = sense()
        if pipe_to_sensor.poll():
            pipe_to_grapher.send(data)  # Freshest possible
            pipe_to_sensor.recv()  # Clear the pipe
        record(data)

请注意,传感器可以直接通过 if poll()return直接传递False,因为这表明绘图者尚未准备好接收数据。您还可以轻松地扩展系统以使用特殊值将一个进程的状态传达给另一个进程,例如关闭命令。

(预编辑答案如下)

这个问题似乎是在询问如何对传感器数据流应用背压。对于您的特定情况,这听起来multiprocessing.Queue可能是一个很好的解决方案。在内部它使用管道,因此它具有相似的性能特征,但它也有专门的方法put(),可以与一个maxsize参数一起使用,您可以将其设置为像 1 这样的低数字,以便传感器进程将等到绘图程序进程在返回获取更多数据之前检索了一个项目。

如果传感器有自己的缓冲区需要清除,您可以put_nowait()改用并捕获Full错误,作为绘图者无法绘制该数据的指示,应该将其丢弃。这节省了腌制和发送数据的开销,但可能导致非常快速地轮询传感器,这可能是开销本身的来源,具体取决于设备/驱动程序/api。


推荐阅读