首页 > 解决方案 > Python 多处理逐渐增加内存,直到它运行我们的

问题描述

我有一个包含多个模块的 python 程序。他们是这样的:

  1. 作为入口点并管理程序整体流程的作业类

  2. 任务类,它是要在给定数据上运行的任务的基类。许多专为对不同数据列进行不同类型的计算而创建的 SubTask 类都派生自 Task 类。想想数据中的 10 列,每一列都有自己的任务来做一些处理。例如。CurrencyConverterTask 可以使用“价格”列来返回本地货币值等。

  3. 许多其他模块,例如用于获取数据的连接器、实用程序模块等,我认为这些模块与此问题无关。

程序的一般流程:不断从db中获取数据->处理数据->将更新后的数据写回db。

我决定在多处理中进行,因为任务相对简单。它们中的大多数都进行一些基本的算术或逻辑运算,并且在一个进程中运行它需要很长时间,特别是从大型数据库中获取数据并且按顺序处理非常慢。

所以多处理(mp)代码看起来像这样(我无法公开整个文件,所以我正在编写一个简化版本,未包含的部分在这里不相关。我已经通过注释掉它们进行了测试,所以这是一个准确的表示的实际代码):

class Job():
    def __init__():
        block_size = 100 # process 100 rows at a time
        some_query = "SELECT * IF A > B" # some query to filter data from db

    def data_getter():
        # continusouly get data from the db and put it into a queue in blocks
        cursor = Connector.get_data(some_query)
        block = []

        for item in cursor:
            block.append(item)
            if len(block) ==block_size:
                data_queue.put(data)
                block = []

        data_queue.put(None) # this will indicate the worker processors when to stop

    def monitor():
        # continuously monitor the system stats
        timer = Timer()
        while (True):
            if timer.time_taken >= 60: # log some stats every 60 seconds
                print(utils.system_stats())
                timer.reset()

    def task_runner():
        while True:
            # get data from the queue
            # if there's no data, break out of loop
            data = data_queue.get()
            if data is None:
                break

            # run task one by one
            for task in tasks:
                task.do_something(data)

    def run():
        # queue to put data for processing
        data_queue = mp.Queue()

        # start a process for reading data from db
        dg = mp.Process(target=self.data_getter).start()

        # start a process for monitoring system stats
        mon = mp.Process(target=self.monitor).start()

        # get a list of tasks to run
        tasks = [t for t in taskmodule.get_subtasks()]

        workers = []
        # start 4 processes to do the actual processing
        for _ in range(4):
            worker = mp.Process(target=task_runner)
            worker.start()
            workers.append(worker)

        for w in workers:
            w.join()

        mon.terminate() # terminate the monitor process
        dg.terminate() # end the data getting process



if __name__ == "__main__":
  job = Job()
  job.run()

整个程序运行如下:python3 runjob.py

预期的行为:连续的数据流进入,data_queue每个工作进程获取数据并处理,直到没有更多来自游标的数据,此时工作人员完成并且整个程序完成。

这按预期工作,但出乎意料的是系统内存使用量不断攀升,直到系统崩溃。i'm getting here 不会被data复制到任何地方(至少是故意的)。我希望内存使用在整个程序中保持稳定。的长度data_queue很少超过 1 或 2,因为进程足够快,可以在可用时获取数据,所以不是队列持有太多数据。

我的猜测是,这里启动的所有进程都是长时间运行的进程,这与此有关。虽然我可以打印 pid,并且如果我按照top命令上的 PID,data_getter 和监视进程不会超过内存使用量的 2%。4 个工作进程也不会使用大量内存。整个事情运行的主进程也没有。有一个下落不明的进程占用了 20% 以上的内存。它让我非常烦恼,我无法弄清楚它是什么。

标签: pythonmultithreadingmemorymemory-leaksmultiprocessing

解决方案


推荐阅读