python-3.x - 实时更新不同python进程中的DataFrame
问题描述
因此,假设您有一个 Python 进程,它以每秒大约 500 行的速度实时收集数据(这可以进一步并行化以减少到大约 50 ps),从排队系统并将其附加到DataFrame
:
rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
recv = rq.get(block=True)
# some converting
df.append(recv, ignore_index = True)
现在的问题是:如何根据这些数据利用 CPU?所以我完全了解GIL的局限性,并在这里查看了多处理管理器 命名空间,但看起来在中心保持数据帧的延迟方面存在一些缺点。在深入研究之前,我还尝试了我认可的在进程之间应用的方法,这会变慢并且开销太大。pool.map
pickle
所以在这一切之后,我终于想知道,如何(如果)每秒插入 500 行(甚至每秒 50 行)可以转移到不同的进程,并留下一些 CPU 时间来对子数据应用统计信息和启发式方法流程?
也许在两个进程之间实现一个自定义的 tcp 套接字或排队系统会更好?或者是否有一些实现pandas
或其他库真正允许快速访问父进程中的一个大数据帧?我爱熊猫!
解决方案
在我们开始之前,我应该说您没有告诉我们太多有关您的代码的信息,但您心中有这一点,即每秒仅将那些 50/500 新行传输到子进程并尝试DataFrame
在子进程中创建那个大的。
我正在做一个和你一模一样的项目。Python 有许多 IPC 实现,Pipe
如Queue
你所知道的。Shared Memory
解决方案在许多情况下可能存在问题,AFAIK python 官方文档警告使用共享内存。
根据我的经验,仅在两个进程之间转换数据的最佳方法是,因此您可以腌制 DataFrame 并将其发送到另一个连接端点。我强烈建议您在您的情况下避免使用套接字 ( )。Pipe
TCP
AF_INET
如果不经过腌制和解封,熊猫DataFrame
就无法转换到另一个过程。所以我还建议您将原始数据作为内置类型dict
而不是 DataFrame 传输。这可能会使pickle和unpicking更快,并且它的内存占用更少。
推荐阅读
- c - 对二维数组进行排序,使最大的元素位于主对角线上
- sql - Postgres 整数超出范围
- python - 如何根据第三个列表在不同的行上按不同列表中的数字打印列表中的字符串?
- snowflake-cloud-data-platform - 查询以获取雪花中数据库中所有表的行数
- r - dplyr: Handing over multiple variables to group_by in a function
- python - 定义内部有变量的lambda函数时,如何防止在更改变量时更改函数?
- python - 如何通过 Anaconda 安装安装 winsound 库
- android - 如何为下载的 Android 项目创建应用程序配置
- reactjs - 无法在 Todo 应用程序中输入 React 输入文本字段
- javascript - 使用没有变量声明的异步迭代