python - 多处理日志记录 - 如何将 loguru 与 joblib Parallel 一起使用
问题描述
我有一堆 Python 脚本来运行一些数据科学模型。这需要相当长的时间,加快速度的唯一方法是使用多处理。为了实现这一点,我使用了这个joblib
库,它运行得非常好。然而,不幸的是,这会打乱日志记录,并且控制台输出也会出现乱码(但预期如此),因为所有进程同时转储各自的输出。
我是使用该logging
库的新手,并按照其他一些 SO 答案尝试使其正常工作。我正在使用 8 个内核进行处理。使用 SO 上的答案,我写了日志文件,并预计每次迭代都有 8 个新文件。但是,它在第一次迭代时创建了 8 个文件,并且每次循环只写入/附加到这 8 个文件中。这有点不方便,所以我多探索了一下,发现loguru
and logzero
。虽然它们都涵盖了使用multiprocessing
的示例,但它们都没有展示如何将其与joblib
. 这是我到目前为止所拥有的:
运行模型.py
import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger
import pandas as pd
import psutil
from joblib import Parallel, delayed
import helper
import log
import prep_data
import stock_subscriber_data
import train_model
def get_pred(cust_df, stock_id, logger):
logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))
cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)
single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)
stock_subscriber_data.write_data(abn_df, 't1')
test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)
return True
def main():
cust_df = stock_subscriber_data.get_data()
cust_df = helper.clean_data(cust_df)
stock_list = cust_df['intStockID'].unique()
max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
num_cores = min(multiprocessing.cpu_count(), max_proc)
logger.add("test_loguru.log", format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)
Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logger) for s in stock_list)
if __name__ == "__main__":
main()
train_model.py
import math
from datetime import datetime
from itertools import product
from math import sqrt
import pandas as pd
from keras import backend
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import Sequential
from numpy import array
from numpy import mean
from pandas import DataFrame
from pandas import concat
from sklearn.metrics import mean_squared_error
import helper
import stock_subscriber_data
# bunch of functions here that don't need logging...
# walk-forward validation for univariate data
def walk_forward_validation(logger, data, n_test, cfg):
#... do stuff here ...
#... and here ...
logger.info('{0:.3f}'.format(error))
return error, model
# score a model, return None on failure
def repeat_evaluate(logger, data, config, n_test, n_repeats=10):
#... do stuff here ...
#... and here ...
logger.info('> Model{0} {1:.3f}'.format(key, result))
return key, result, best_model
def read_train_write(data_df, stock_id, series, last_date, logger):
#... do stuff here ...
#... and here ...
logger.info('done')
#... do stuff here ...
#... and here ...
# bunch of logger.info() statements here...
#
#
#
#
#... do stuff here ...
#... and here ...
return test_y, prd
当一次只有一个进程时,这很有效。但是,_pickle.PicklingError: Could not pickle the task to send it to the workers.
在多进程模式下运行时出现错误。我究竟做错了什么?我该如何补救?我不介意切换到loguru
or以外的东西logzero
,只要我可以创建一个具有连贯日志的文件,甚至是n
文件,每个文件都包含joblib
.
解决方案
我通过修改我的run_models.py
. 现在,我每个循环都有一个日志文件。这会创建很多日志文件,但它们都与每个循环相关,而不是混乱或任何东西。一步一步,我猜。这是我所做的:
运行模型.py
import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger
import pandas as pd
import psutil
from joblib import Parallel, delayed
import helper
import log
import prep_data
import stock_subscriber_data
import train_model
def get_pred(cust_df, stock_id):
log_file_name = "log_file_{}".format(stock_id)
logger.add(log_file_name, format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)
logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))
cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)
single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)
stock_subscriber_data.write_data(abn_df, 't1')
test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)
return True
def main():
cust_df = stock_subscriber_data.get_data()
cust_df = helper.clean_data(cust_df)
stock_list = cust_df['intStockID'].unique()
max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
num_cores = min(multiprocessing.cpu_count(), max_proc)
Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s) for s in stock_list)
if __name__ == "__main__":
main()
推荐阅读
- django - 为什么我的 django 项目返回“django.core.exceptions.AppRegistryNotReady:应用程序尚未加载”错误
- python - Altair 渲染器静默失败
- .net - 无法在 PowerShell 中将字符串转换为 int
- azure-blob-storage - 使用 SQL 行中的文件名和数据创建 Blob
- python-3.x - 如何在 Python 中使用 storetext 命令
- autodesk-designautomation - Revit 的 Design Automation API (Beta)v3 是否支持旧版本的 Revit 文件,例如 2015/2016/2017?
- mysql - MySQL IF 语句然后 COUNT 列中的值
- postgresql-10 - Postgresql 负载测试场景
- php - Laravel Passport - /oauth/token 生成无效令牌
- reactjs - 反应钩子 useCallback 与循环内的参数