首页 > 解决方案 > 每次启动多处理时进程消耗更多内存

问题描述

首先,我对 Python 很陌生,这是我的第一个真正的项目。

我一直在努力解决我在 for 循环中遇到的一个问题,在该循环中我迭代了 3 个值的列表(regionid),并且在每次迭代中,我使用miniutils 中的 parrallel_progbar打开 40 个进程,以遍历另一个 15000 个值的列表(typeid):

for regionid in regionid_list:
  print("{}{}".format('Loading market historic data from region: ', regionid))
  market_list = parallel_progbar(   item_history, [(processes, regionid, typeid) for typeid in typeids_list], starmap=True, nprocs=processes, timeout=1)
  print("{}{}".format('Writing to database for regionid: ', regionid))
  con = sql_connection('database.db')
  update_db_values(con, regionid, market_list)
  con.commit()
  con.close()

我用 top 监控了我的内存,我的问题是我可以看到我有内存泄漏。每次迭代 regionid 时,打开的每个进程都需要更多内存,直到第三次迭代时内存不足。它在每个过程的第一次迭代中从 4.5 开始,然后在第二次中达到 6,在第三次中达到 7.5。在 parrallel_progbar 完成对 typeid 的迭代后,我尝试使用 gc.collect(),因为我怀疑这可能是原因,但无济于事。我也尝试过使用 iparallel_progbar 所以它可以作为一个生成器,但同样没有区别。当 parrallel_progbar 关闭时,我可以看到所有进程都已关闭,并且内存使用量按预期急剧下降。我必须承认我真的不知道可能是什么问题,所以我想我正在寻找一些关于如何解决它的提示。由于这也是我在这里的第一篇文章,我希望我提供了足够的信息,否则请问我。我在for循环中调用的函数粘贴在这篇文章的底部

EDIT1:我还应该补充一点,当关闭python脚本并从头开始重新启动时。所有问题都被重置,即。需要所有三个迭代才能解决内存问题。我还尝试了单独的 regionid 和 bd 的最后一次迭代,它本身没有问题

EDIT2:这是脚本执行时运行顶部的屏幕截图。左边的图像是在 regionid 的迭代过程中。右上角的两个是在第一次和第二次迭代之后,再次关闭进程并将数据写入数据库。右下角是崩溃时可用内存基本上是 运行脚本的 0 个顶部转储

我为 typeid 中的每个值调用的函数是:

def item_history(processes, RegionID, TypeID):
#CONSTRUCT THE URL TO CALL
Datasource = 'tranquility'
url = 'https://esi.evetech.net/latest/markets/'

url_final = url + RegionID +'/history/'
payload = {'datasource': Datasource, 'type_id': TypeID}

#SET PARAMETERS AND DECLARE VARIABLES
counter = 0
volume_summation = 0
market_value_summation = 0.0
vol_median_list = []
average_list = []
size_median_list = []
high_median_list = []
low_median_list = []
spread_median_list = []

week = 7
month = 30
quarter = 90
year = 365

error_buffer = 1
error_list = [400, 404, 420, 422, 500, 503, 504, 520]
#CALL THE URL AND RECEIVE RESPONSE

try:
    response = requests.get(
        url_final,
        params = payload
        )
except:
    null_response = null_return(TypeID, RegionID, "{}{}".format('ERROR: ', requests.exceptions))
    return null_response
if response:
    #print('Succes')
    history = json.loads(response.text)
    if not history:
        #print('Returned nothing')
        null_response = null_return(TypeID, RegionID, "Null")
        return null_response
    else:
        #con = sql_connection()
        history.reverse()
        history_length = len(history)
        
        last_updated = ''
        volume_yesterday = 0
        market_value_yesterday = 0
        volume_week_med = 0
        market_value_week_med = 0
        volume_month_med = 0
        market_value_month_med = 0
        volume_quarter_med = 0
        market_value_quarter_med = 0
        volume_year_med = 0
        market_value_year_med = 0
        high_yesterday = 0
        high_week_med = 0
        high_month_med = 0
        high_quarter_med = 0
        high_year_med = 0
        low_yesterday = 0
        low_week_med = 0
        low_month_med = 0
        low_quarter_med = 0
        low_year_med = 0
        spread_yesterday = 0
        spread_week_med = 0
        spread_month_med = 0
        spread_quarter_med = 0
        spread_year_med = 0
        _52w_low = 0
        _52w_high = 0

        for history_item in history:
            volume = history_item['volume']
            average = float(history_item['average'])
            size = float(volume) * float(average)
            date = str(history_item['date'])
            high = float(history_item['highest'])
            low = float(history_item['lowest'])
            spread = high - low
            
            vol_median_list.append(volume)
            average_list.append(average)
            size_median_list.append(size)
            high_median_list.append(high)
            low_median_list.append(low)
            spread_median_list.append(spread)
            
            volume_summation = volume_summation + int(volume)
            market_value_summation = market_value_summation + (float(volume) * float(average))
            
            if counter == 0:
                last_updated = date
                volume_yesterday = "{:.2f}".format(volume)
                market_value_yesterday = "{:.2f}".format(size)
                high_yesterday = "{:.2f}".format(high)
                low_yesterday = "{:.2f}".format(low)
                spread_yesterday = "{:.2f}".format(spread)
                
            elif counter == week - 1:
                volume_week_med = "{:.2f}".format(statistics.median(vol_median_list))
                market_value_week_med = "{:.2f}".format(statistics.median(size_median_list))
                high_week_med = "{:.2f}".format(statistics.median(high_median_list))
                low_week_med = "{:.2f}".format(statistics.median(low_median_list))
                spread_week_med = "{:.2f}".format(statistics.median(spread_median_list))
                _52w_low = "{:.2f}".format(min(average_list))
                _52w_high = "{:.2f}".format(max(average_list))
                
            elif counter == month - 1:
                volume_month_med = "{:.2f}".format(statistics.median(vol_median_list))
                market_value_month_med = "{:.2f}".format(statistics.median(size_median_list))
                high_month_med = "{:.2f}".format(statistics.median(high_median_list))
                low_month_med = "{:.2f}".format(statistics.median(low_median_list))
                spread_month_med = "{:.2f}".format(statistics.median(spread_median_list))
                _52w_low = "{:.2f}".format(min(average_list))
                _52w_high = "{:.2f}".format(max(average_list))
                
            elif counter == quarter - 1:
                volume_quarter_med = "{:.2f}".format(statistics.median(vol_median_list))
                market_value_quarter_med = "{:.2f}".format(statistics.median(size_median_list))
                high_quarter_med = "{:.2f}".format(statistics.median(high_median_list))
                low_quarter_med = "{:.2f}".format(statistics.median(low_median_list))
                spread_quarter_med = "{:.2f}".format(statistics.median(spread_median_list))
                _52w_low = "{:.2f}".format(min(average_list))
                _52w_high = "{:.2f}".format(max(average_list))
                
            elif counter == year - 1:
                volume_year_med = "{:.2f}".format(statistics.median(vol_median_list))
                market_value_year_med = "{:.2f}".format(statistics.median(size_median_list))
                high_year_med = "{:.2f}".format(statistics.median(high_median_list))
                low_year_med = "{:.2f}".format(statistics.median(low_median_list))
                spread_year_med = "{:.2f}".format(statistics.median(spread_median_list))
                _52w_low = "{:.2f}".format(min(average_list))
                _52w_high = "{:.2f}".format(max(average_list))
                break
            
            counter = counter + 1
            result_list = [ TypeID,
                            last_updated,
                            volume_yesterday,
                            volume_week_med,
                            volume_month_med,
                            volume_quarter_med,
                            volume_year_med,
                            market_value_yesterday,
                            market_value_week_med,
                            market_value_month_med,
                            market_value_quarter_med,
                            market_value_year_med,
                            high_yesterday,
                            high_week_med,
                            high_month_med,
                            high_quarter_med,
                            high_year_med,
                            low_yesterday,
                            low_week_med,
                            low_month_med,
                            low_quarter_med,
                            low_year_med,
                            spread_yesterday,
                            spread_week_med,
                            spread_month_med,
                            spread_quarter_med,
                            spread_year_med,
                            _52w_low,
                            _52w_high]
        
        #update_db_values(con, RegionID, result_list)
        #con.commit()
        #con.close()
        return result_list
else:
    try:
        if response.headers:
            Error_remain = int(response.headers['x-esi-error-limit-remain'])
            Error_reset = int(response.headers['x-esi-error-limit-reset'])
            Error_sleep = Error_reset + 1
        
            if (Error_remain <= processes + error_buffer) and (int(response.status_code) in error_list):
                print("{}{}{}{}{}{}".format('Errors remaining: ', Error_remain,' Will sleep for ', Error_sleep, 's for itemid: ', TypeID))
                time.sleep(Error_sleep)
    except:
        print("{},{}".format('Exception occured with header for ItemID: ', TypeID))
    null_response = null_return(TypeID, RegionID, "{}{}".format('ERROR: ', response.status_code))
    #print("{}{}{}{}".format('ItemID: ', TypeID, ' returns ERROR: ', response.status_code))
    return null_response

标签: python-3.xmemory-leaksmultiprocessing

解决方案


推荐阅读