首页 > 解决方案 > 实时更新不同python进程中的DataFrame

问题描述

因此,假设您有一个 Python 进程,它以每秒大约 500 行的速度实时收集数据(这可以进一步并行化以减少到大约 50 ps),从排队系统并将其附加到DataFrame

rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
    recv = rq.get(block=True)
    # some converting
    df.append(recv, ignore_index = True)

现在的问题是:如何根据这些数据利用 CPU?所以我完全了解GIL的局限性,并在这里查看了多处理管理器 命名空间,但看起来在中心保持数据帧的延迟方面存在一些缺点。在深入研究之前,我还尝试了我认可的在进程之间应用的方法,这会变慢并且开销太大。pool.mappickle

所以在这一切之后,我终于想知道,如何(如果)每秒插入 500 行(甚至每秒 50 行)可以转移到不同的进程,并留下一些 CPU 时间来对子数据应用统计信息和启发式方法流程?

也许在两个进程之间实现一个自定义的 tcp 套接字或排队系统会更好?或者是否有一些实现pandas或其他库真正允许快速访问父进程中的一个大数据帧?我爱熊猫!

标签: python-3.xpandasdataframepython-multiprocessing

解决方案


在我们开始之前,我应该说您没有告诉我们太多有关您的代码的信息,但您心中有这一点,即每秒仅将那些 50/500 新行传输到子进程并尝试DataFrame在子进程中创建那个大的。

我正在做一个和你一模一样的项目。Python 有许多 IPC 实现,PipeQueue你所知道的。Shared Memory解决方案在许多情况下可能存在问题,AFAIK python 官方文档警告使用共享内存。

根据我的经验,在两个进程之间转换数据的最佳方法是,因此您可以腌制 DataFrame 并将其发送到另一个连接端点。我强烈建议您在您的情况下避免使用套接字 ( )。PipeTCPAF_INET

如果不经过腌制和解封,熊猫DataFrame就无法转换到另一个过程。所以我还建议您将原始数据作为内置类型dict而不是 DataFrame 传输。这可能会使pickle和unpicking更快,并且它的内存占用更少。


推荐阅读