python - Python 多处理逐渐增加内存,直到它运行我们的
问题描述
我有一个包含多个模块的 python 程序。他们是这样的:
作为入口点并管理程序整体流程的作业类
任务类,它是要在给定数据上运行的任务的基类。许多专为对不同数据列进行不同类型的计算而创建的 SubTask 类都派生自 Task 类。想想数据中的 10 列,每一列都有自己的任务来做一些处理。例如。CurrencyConverterTask 可以使用“价格”列来返回本地货币值等。
许多其他模块,例如用于获取数据的连接器、实用程序模块等,我认为这些模块与此问题无关。
程序的一般流程:不断从db中获取数据->处理数据->将更新后的数据写回db。
我决定在多处理中进行,因为任务相对简单。它们中的大多数都进行一些基本的算术或逻辑运算,并且在一个进程中运行它需要很长时间,特别是从大型数据库中获取数据并且按顺序处理非常慢。
所以多处理(mp)代码看起来像这样(我无法公开整个文件,所以我正在编写一个简化版本,未包含的部分在这里不相关。我已经通过注释掉它们进行了测试,所以这是一个准确的表示的实际代码):
class Job():
def __init__():
block_size = 100 # process 100 rows at a time
some_query = "SELECT * IF A > B" # some query to filter data from db
def data_getter():
# continusouly get data from the db and put it into a queue in blocks
cursor = Connector.get_data(some_query)
block = []
for item in cursor:
block.append(item)
if len(block) ==block_size:
data_queue.put(data)
block = []
data_queue.put(None) # this will indicate the worker processors when to stop
def monitor():
# continuously monitor the system stats
timer = Timer()
while (True):
if timer.time_taken >= 60: # log some stats every 60 seconds
print(utils.system_stats())
timer.reset()
def task_runner():
while True:
# get data from the queue
# if there's no data, break out of loop
data = data_queue.get()
if data is None:
break
# run task one by one
for task in tasks:
task.do_something(data)
def run():
# queue to put data for processing
data_queue = mp.Queue()
# start a process for reading data from db
dg = mp.Process(target=self.data_getter).start()
# start a process for monitoring system stats
mon = mp.Process(target=self.monitor).start()
# get a list of tasks to run
tasks = [t for t in taskmodule.get_subtasks()]
workers = []
# start 4 processes to do the actual processing
for _ in range(4):
worker = mp.Process(target=task_runner)
worker.start()
workers.append(worker)
for w in workers:
w.join()
mon.terminate() # terminate the monitor process
dg.terminate() # end the data getting process
if __name__ == "__main__":
job = Job()
job.run()
整个程序运行如下:python3 runjob.py
预期的行为:连续的数据流进入,data_queue
每个工作进程获取数据并处理,直到没有更多来自游标的数据,此时工作人员完成并且整个程序完成。
这按预期工作,但出乎意料的是系统内存使用量不断攀升,直到系统崩溃。i'm getting here 不会被data
复制到任何地方(至少是故意的)。我希望内存使用在整个程序中保持稳定。的长度data_queue
很少超过 1 或 2,因为进程足够快,可以在可用时获取数据,所以不是队列持有太多数据。
我的猜测是,这里启动的所有进程都是长时间运行的进程,这与此有关。虽然我可以打印 pid,并且如果我按照top
命令上的 PID,data_getter 和监视进程不会超过内存使用量的 2%。4 个工作进程也不会使用大量内存。整个事情运行的主进程也没有。有一个下落不明的进程占用了 20% 以上的内存。它让我非常烦恼,我无法弄清楚它是什么。
解决方案
推荐阅读
- mariadb - 如何在主从拓扑中配置两个数据中心
- c++ - 名称查找 - 运算符>>未找到
- python - IndexError:字符串索引超出范围——即使程序给出了所需的输出
- python - 使用 Flask/Python 在 HTML 中从 sqlite 服务器渲染 Blob
- tensorflow2.0 - tfRecords 在 TF2 中显示错误
- reactjs - 如何在测试中设置初始 React 功能组件状态?
- php - 通过 php v7.1 而不是 php v7.2 安装的 Composer 包
- ibm-cloud - 使用 ibmcloud cli 检索私有服务端点
- html - 如何将 *ngFor (Angular) 中的值传递给 data-target 属性?
- json - 如何从 vuejs 中的 Windows 文件资源管理器访问任何 .json